您的位置:首页 > 编程语言

Hbase编程入门之MapReduce

2014-04-06 20:54 218 查看
refer to: http://blog.csdn.net/darke1014/article/details/8665484
Tips:如果用Eclipse开发,需要加入hadoop所有的jar包以及HBase三个jar包(hbase,zooKooper,protobuf-java)。

下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。

案例一:

首先先介绍下如何上传数据,还是以最熟悉到wordcount案例开始,我们的目的是将wordcount的结果存储到Hbase而不是HDFS下。

给出代码:

[java] view
plaincopy

package test1;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

public class WordCountHBase {

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

//map函数没有改变

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

map函数没有改变

[java] view
plaincopy

//Reduce类,主要是将键值传到HBase表中

public static class IntSumReducer

extends TableReducer <Text,IntWritable,ImmutableBytesWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

Put put = new Put(key.getBytes()); //put实例化,每一个词存一行

//列族为content,列修饰符为count,列值为数目

put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

context.write(new ImmutableBytesWritable(key.getBytes()), put);

}

}

由上面可知IntSumReducer继承自TableReduce,在hadoop里面TableReducer继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable,意为不可变类型,因为HBase里所有数据都是用字符串存储的。

[java] view
plaincopy

@SuppressWarnings("deprecation")

public static void main(String[] args) throws Exception {

String tablename = "wordcount";

//实例化Configuration,注意不能用 new HBaseConfiguration()了。

Configuration conf = HBaseConfiguration.create();

HBaseAdmin admin = new HBaseAdmin(conf);

if(admin.tableExists(tablename)){

System.out.println("table exists! recreating ...");

admin.disableTable(tablename);

admin.deleteTable(tablename);

}

HTableDescriptor htd = new HTableDescriptor(tablename);

HColumnDescriptor hcd = new HColumnDescriptor("content");

htd.addFamily(hcd); //创建列族

admin.createTable(htd); //创建表

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 1) {

System.err.println("Usage: wordcount <in> <out>"+otherArgs.length);

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(WordCountHBase.class);

job.setMapperClass(TokenizerMapper.class);

//job.setCombinerClass(IntSumReducer.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

//此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的

TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job);

//key和value到类型设定最好放在initTableReducerJob函数后面,否则会报错

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

在job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob(tablename,
IntSumReducer.class, job); 来执行reduce类。

需要注意的是此处的TableMapReduceUtil是hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的,否则会报错。

案例二:

下面再介绍下如何进行读取,读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了,Reducer函数直接输出得到的结果就行了。

[java] view
plaincopy

package test1;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import test1.WordCount.IntSumReducer;

import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;

public class ReadHBase {

public static class TokenizerMapper

extends TableMapper<Text, Text>{

public void map(ImmutableBytesWritable row, Result values, Context context

) throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer("");

for(java.util.Map.Entry<byte[],byte[]> value : values.getFamilyMap(

"content".getBytes()).entrySet()){

String str = new String(value.getValue()); //将字节数组转换成String类型,需要new String();

if(str != null){

sb.append(new String(value.getKey()));

sb.append(":");

sb.append(str);

}

context.write(new Text(row.get()), new Text(new String(sb)));

}

}

}

map函数继承到TableMapper接口,从result中读取查询结果。

[java] view
plaincopy

public static class IntSumReducer

extends Reducer <Text,Text,Text,Text> {

private Text result = new Text();

public void reduce(Text key, Iterable<Text> values,

Context context

) throws IOException, InterruptedException {

for (Text val : values) {

result.set(val);

context.write(key,result);

}

}

}

reduce函数没有改变,直接输出到文件中即可

[java] view
plaincopy

@SuppressWarnings("deprecation")

blic static void main(String[] args) throws Exception {

String tablename = "wordcount";

//实例化Configuration,注意不能用 new HBaseConfiguration()了。

Configuration conf = HBaseConfiguration.create();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>"+otherArgs.length);

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(ReadHBase.class);

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

job.setReducerClass(IntSumReducer.class);

//此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的

Scan scan = new Scan(args[0].getBytes());

TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

其中我输入的两个参数分别是“aa ouput” 分别是开始查找的行(这里为从“aa”行开始找),和输出文件到存储路径(这里为存到HDFS目录到output文件夹下)

要注意的是,在JOB的配置中需要实现initTableMapperJob方法。与第一个例子类似,

在job配置的时候不用设置 job.setMapperClass(); 而是用 TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);来执行mapper类。Scan实例是查找的起始行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: