您的位置:首页 > 其它

HBase with MapReduce (Only Read)

2015-09-18 17:48 232 查看
最近在学习HBase,在看到了如何使用Mapreduce来操作Hbase,下面将几种情况介绍一下,具体的都可以参照官网上的文档说明。官网文档连接:http://hbase.apache.org/book.html 。通过学习我个人的对MapReduce操作HBase的方式可以看作的是Map过程是负责读取过程,Reduce负责的是写入的过程,一读一写可以完成对HBase的读写过程。

利用MapReduce 读取(Read)HBase中的表数据,这一过程由于只涉及到读过程,因此仅仅只需要实现Map函数即可。

  (1)ReadHbaseMapper类的实现是需要继承TableMapper的,具体的实现如下:

package com.datacenter.HbaseMapReduce.Read;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

public class ReadHbaseMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value, Context context)
throws InterruptedException, IOException {
// process data for the row from the Result instance.
printResult(value);
}

// 按顺序输出
public void printResult(Result rs) {

if (rs.isEmpty()) {
System.out.println("result is empty!");
return;
}

NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
.getMap();
String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
System.out.println("rowkey->" + rowkey);
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
.entrySet()) {
System.out.print("\tfamily->" + Bytes.toString(temp.getKey()));
for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
.getValue().entrySet()) {
System.out.print("\tcol->" + Bytes.toString(value.getKey()));
for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
System.out.print("\tvesion->" + va.getKey());
System.out.print("\tvalue->"
+ Bytes.toString(va.getValue()));
System.out.println();
}
}
}
}
}


  (2)添加main函数类,来加载配置信息,是实现如下:

package com.datacenter.HbaseMapReduce.Read;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

//通过map从hbase中读取数据

public class ReadHbase {

static public String rootdir = "hdfs://hadoop3:8020/hbase";
static public String zkServer = "hadoop3";
static public String port = "2181";

private static Configuration conf;
private static HConnection hConn = null;

public static void HbaseUtil(String rootDir, String zkServer, String port) {

conf = HBaseConfiguration.create();// 获取默认配置信息
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);

try {
hConn = HConnectionManager.createConnection(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {

HbaseUtil( rootdir,  zkServer, port);

//Configuration config = HBaseConfiguration.create();

Job job = new Job(conf, "ExampleRead");
job.setJarByClass(ReadHbase.class);     // class that contains mapper

Scan scan = new Scan();      //此处可以添加过滤器来设置过滤等
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
"score",        // input HBase table name
scan,             // Scan instance to control CF and attribute selection
ReadHbaseMapper.class,   // mapper
null,             // mapper output key
null,             // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper

boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}


此时已经完成了对一个表进行遍历的操作的过程,也就是输出整张表的内容的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: