Hbase编程入门之MapReduce
2013-03-15 15:49
218 查看
Tips:如果用Eclipse开发,需要加入hadoop所有的jar包以及HBase三个jar包(hbase,zooKooper,protobuf-java)。
下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。
案例一:
首先先介绍下如何上传数据,还是以最熟悉到wordcount案例开始,我们的目的是将wordcount的结果存储到Hbase而不是HDFS下。
给出代码:
map函数没有改变
由上面可知IntSumReducer继承自TableReduce,在hadoop里面TableReducer继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable,意为不可变类型,因为HBase里所有数据都是用字符串存储的。
需要注意的是此处的TableMapReduceUtil是hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的,否则会报错。
案例二:
下面再介绍下如何进行读取,读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了,Reducer函数直接输出得到的结果就行了。
map函数继承到TableMapper接口,从result中读取查询结果。
要注意的是,在JOB的配置中需要实现initTableMapperJob方法。与第一个例子类似,
在job配置的时候不用设置 job.setMapperClass(); 而是用 TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);来执行mapper类。Scan实例是查找的起始行。
下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。
案例一:
首先先介绍下如何上传数据,还是以最熟悉到wordcount案例开始,我们的目的是将wordcount的结果存储到Hbase而不是HDFS下。
给出代码:
package test1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; public class WordCountHBase { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //map函数没有改变 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
map函数没有改变
//Reduce类,主要是将键值传到HBase表中 public static class IntSumReducer extends TableReducer <Text,IntWritable,ImmutableBytesWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); Put put = new Put(key.getBytes()); //put实例化,每一个词存一行 //列族为content,列修饰符为count,列值为数目 put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); context.write(new ImmutableBytesWritable(key.getBytes()), put); } }
由上面可知IntSumReducer继承自TableReduce,在hadoop里面TableReducer继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable,意为不可变类型,因为HBase里所有数据都是用字符串存储的。
@SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { String tablename = "wordcount"; //实例化Configuration,注意不能用 new HBaseConfiguration()了。 Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); if(admin.tableExists(tablename)){ System.out.println("table exists! recreating ..."); admin.disableTable(tablename); admin.deleteTable(tablename); } HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor hcd = new HColumnDescriptor("content"); htd.addFamily(hcd); //创建列族 admin.createTable(htd); //创建表 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 1) { System.err.println("Usage: wordcount <in> <out>"+otherArgs.length); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCountHBase.class); job.setMapperClass(TokenizerMapper.class); //job.setCombinerClass(IntSumReducer.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的 TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job); //key和value到类型设定最好放在initTableReducerJob函数后面,否则会报错 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }在job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job); 来执行reduce类。
需要注意的是此处的TableMapReduceUtil是hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的,否则会报错。
案例二:
下面再介绍下如何进行读取,读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了,Reducer函数直接输出得到的结果就行了。
package test1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import test1.WordCount.IntSumReducer; import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry; public class ReadHBase { public static class TokenizerMapper extends TableMapper<Text, Text>{ public void map(ImmutableBytesWritable row, Result values, Context context ) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(""); for(java.util.Map.Entry<byte[],byte[]> value : values.getFamilyMap( "content".getBytes()).entrySet()){ String str = new String(value.getValue()); //将字节数组转换成String类型,需要new String(); if(str != null){ sb.append(new String(value.getKey())); sb.append(":"); sb.append(str); } context.write(new Text(row.get()), new Text(new String(sb))); } } }
map函数继承到TableMapper接口,从result中读取查询结果。
public static class IntSumReducer extends Reducer <Text,Text,Text,Text> { private Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { for (Text val : values) { result.set(val); context.write(key,result); } } }reduce函数没有改变,直接输出到文件中即可
@SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { String tablename = "wordcount"; //实例化Configuration,注意不能用 new HBaseConfiguration()了。 Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"+otherArgs.length); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(ReadHBase.class); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.setReducerClass(IntSumReducer.class); //此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的 Scan scan = new Scan(args[0].getBytes()); TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }其中我输入的两个参数分别是“aa ouput” 分别是开始查找的行(这里为从“aa”行开始找),和输出文件到存储路径(这里为存到HDFS目录到output文件夹下)
要注意的是,在JOB的配置中需要实现initTableMapperJob方法。与第一个例子类似,
在job配置的时候不用设置 job.setMapperClass(); 而是用 TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);来执行mapper类。Scan实例是查找的起始行。
相关文章推荐
- Hbase编程入门之MapReduce
- HBase编程 API入门系列之get(客户端而言)(2)
- Hadoop MapReduce编程 API入门系列之wordcount版本3(七)
- Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
- Hadoop MapReduce编程 API入门系列之小文件合并(二十九)
- hbase MapReduce程序样例入门
- HBase编程 API入门系列之工具Bytes类(7)
- HBase编程 API入门系列之modify(管理端而言)(10)
- [转]HDFS+MapReduce+Hive+HBase十分钟快速入门
- HDFS+MapReduce+Hive+HBase十分钟快速入门
- Hadoop MapReduce编程 API入门系列之wordcount版本4(八)
- Hadoop MapReduce编程 API入门系列之二次排序(十六)
- Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)
- Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)
- HDFS+MapReduce+Hive+HBase十分钟快速入门
- Hadoop MapReduce编程 API入门系列之join(二十六)(未完)
- 【Hadoop入门学习系列之六】HBase基本架构、编程模型和应用案例
- Hadoop MapReduce编程入门案例
- Hadoop MapReduce编程 API入门系列之统计学生成绩版本2(十八)
- Hadoop MapReduce编程 API入门系列之计数器(二十七)