您的位置:首页 > 运维架构

Hadoop MapReduce统计结果直接输出hbase

2014-01-02 22:11 357 查看
MapReduce统计结果直接输出hbase,我使用的是hadoop1.0.4版本和hbase 0.94版本,hadoop和hbase安装伪分布式。1.hadoop安装这里就不讲了。

2.hbase安装我这里将一下。

首页解压habase安装包到/home/hadoop目录。

配置hosts文件如下:

192.168.0.101 hadoop.master

复制代码
配置hbase-site.xml,配置内容如下:

<configuration>

<property>

<name>hbase.rootdir</name>

<value>hdfs://hadoop.master:9000/hbase</value>

</property>

<property>

<name>hbase.cluster.distributed</name>

<value>true</value>

</property>

<property>

<name>hbase.zookeeper.quorum</name>

<value>hadoop.master</value>

</property>

<property>

<name>hbase.zookeeper.property.dataDir</name>

<value>/home/hadoop/zookeeper</value>

</property>

<property>

<name>hbase.regionserver.handler.count</name>

<value>100</value>

</property>

<property>

<name>hbase.hregion.max.filesize</name>

<value>8589934592</value>

</property>

<property>

<name>hfile.block.cache.size</name>

<value>0.3</value>

</property>

<property>

<name>dfs.replication</name>

<value>1</value>

</property>

</configuration>

复制代码
hbase-site.xml配置完后,在配置hbase-env.sh,我只把其中配置的如下显示:

export JAVA_HOME=/usr/jdk1.6.0_22

export HBASE_OPTS="-XX:+UseConcMarkSweepGC"

export HBASE_MANAGES_ZK=true

复制代码
上面最后一项一定要打开。设置zookeeper管理hbase。

最后配置regionservers 如下:

hadoop.master

复制代码
注意:如果需要在本地连接hbase,需要关闭防火墙,执行命令 /sbin/service iptables stop

接下来启动hbase,创建表:TestCars,列族Car:

准备数据:

Acura,Integra,Small

Acura,Legend,Midsize

Audi,90,Compact

Audi,100,Midsize

BMW,535i,Midsize

Buick,Century,Midsize

Buick,LeSabre,Large

Buick,Roadmaster,Large

Buick,Riviera,Midsize

Cadillac,DeVille,Large

Cadillac,Seville,Midsize

复制代码
将数据上传hadoop文件系统:

hadoop fs -copyfromLocal /home/hadoop/Car.txt /home/hadoop/input

复制代码
在运行mapreduce时需要将hbase-0.94.6.jar 、zookeeper-3.4.5.jar、protobuf-java-2.4.0a.jar添加到hadoop lib目录下,或者另一种方式在执行mapreduce时,导入前面三个包。

下面是实现的具体代码:

package com.duplicate;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class OutputHbase {

private static Logger logger = LoggerFactory.getLogger(OutputHbase.class);

public static class Map extends Mapper<Object,Text,Text,Text>{

private Text outKey = new Text();

private Text outVal = new Text();

public void map(Object key,Text value,Context context) throws IOException,InterruptedException{

String[] valueSplitted = value.toString().split(",");

if(valueSplitted.length == 3){

String brand = valueSplitted[0];

String model = valueSplitted[1];

String size = valueSplitted[2];

outKey.set(brand);

outVal.set(model + "," + size);

context.write(outKey, outVal);

}

}

}

public static class Reduce extends Reducer<Text,Text,Text,Text>{

private HTablePool pool = null;

private HTableInterface testHTable = null;

private List<Put> testListPut = new ArrayList<Put>();

@Override

public void setup(Context context){

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum", "192.168.0.101");

pool = new HTablePool(conf, 10);

testHTable = pool.getTable("TestCars");

}

@Override

public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{

String brand = key.toString();

for(Text tx : values){

String[] valueSplitted = tx.toString().split(",");

if(valueSplitted.length == 2){

String model = valueSplitted[0];

String size = valueSplitted[1];

byte[] putKey = Bytes.toBytes(brand+","+model);

byte[] putFmaily = Bytes.toBytes("Car");

Put put = new Put(putKey);

byte[] putQ = Bytes.toBytes("brand");

byte[] putVal = Bytes.toBytes(brand);

put.add(putFmaily,putQ,putVal);

putQ = Bytes.toBytes("model");

putVal = Bytes.toBytes(model);

put.add(putFmaily,putQ,putVal);

putQ = Bytes.toBytes("size");

putVal = Bytes.toBytes(size);

put.add(putFmaily,putQ,putVal);

testListPut.add(put);

}

}// End for

testHTable.put(testListPut);

testHTable.flushCommits();

}

@Override

public void cleanup(Context context)throws IOException{

if(null != testHTable){

testHTable.close();

}

if(null != pool){

pool.close();

}

}

}

/**

* @param args

* @throws IOException

* @throws ClassNotFoundException

* @throws InterruptedException

*/

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

Job job = new Job(conf,"OutputHbase");

//TableMapReduceUtil.addDependencyJars(job);

job.setJarByClass(OutputHbase.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

复制代码
执行方式:

hadoop jar /home/hadoop/dedup.jar com.duplicate.OutputHbase /home/hadoop/input/* /home/hadoop/output

复制代码
查看结果:两种方式一种直接在hbase客户查看,另一种是用程序直接读出来:

hbase客户端查询:

scan 'TestCars','Car';

复制代码
java代码查询,我下面只查询了主键key值:

package com.duplicate.local;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;

public class ConnectionHbase {

private static HTablePool pool = null;

/**

* @param args

*/

public static void main(String[] args) {

ConnectionHbase hbase = new ConnectionHbase();

hbase.run();

}

public void run() {

// TODO Auto-generated method stub

Configuration conf = HBaseConfiguration.create();

HTableInterface testHTable = null;

conf.set("hbase.zookeeper.quorum", "192.168.0.101");

pool = new HTablePool(conf, 10);

testHTable = pool.getTable("TestCars");

Scan scan = new Scan();

try {

ResultScanner res = testHTable.getScanner(scan);

for(Result rs : res){

System.out.println(Bytes.toString(rs.getRow()));

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

复制代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: