您的位置:首页 > 其它

HBase新版本与MapReduce集成

2016-05-17 21:54 344 查看
1.MapReduce从hbase读取数据

//读取hbase表数据
public class HbaseAndMapReduce {
public static void main(String[] args) throws Exception {
// 测试数据
// testData();

// 完成的作业是: 有共同爱好的人

System.exit(run());

/*
* TableMapper //对输出hbase数据来格式分割的处理类 TableReducer //对输入hbase数据来统计处理的处理类
* TableOutputFormat //来自hbase的格式 TableInputFormat //输入hbase的格式
*/
}

public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.52.140");

Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HbaseAndMapReduce.class);

Scan scan = new Scan();
// 取对业务有用的数据 tags, nickname
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

// 数据来源 hbase
// TableInputFormat.addColumns(scan, columns);

// ImmutableBytesWritable来自hbase数据的类型
TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, ImmutableBytesWritable.class,
Result.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outhbase" + new Date().getTime()));

return job.waitForCompletion(true) ? 0 : 1;
}

public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, Result> {
@Override

// key是hbase中的行键
// value是hbase中的所行键的所有数据
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context)
throws IOException, InterruptedException {
System.out.println("key :: " + Bytes.toString(key.get()));

List<Cell> cs = value.listCells();
System.out.print("value :: ");
for (Cell cell : cs) {
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
long timestamp = cell.getTimestamp();
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String val = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("RowKey=" + rowKey + ", Timestamp=" + timestamp + ", Family=" + family
+ ", Qualifier=" + qualifier + ", Val=" + val);
}
super.map(key, value, context);
}
}

public static void testData() {
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.52.140");
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();

TableName tn = TableName.valueOf("blog");
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}

HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd01 = new HColumnDescriptor("article");
htd.addFamily(hcd01);
HColumnDescriptor hcd02 = new HColumnDescriptor("author");
htd.addFamily(hcd02);
admin.createTable(htd);

Table t = con.getTable(tn);
Put put = new Put(Bytes.toBytes("1"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"),
Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, "
+ "realtime read/write access to your Big Data"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop"));
put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase"));
put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan"));

Put put02 = new Put(Bytes.toBytes("10"));
put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi"));

Put put03 = new Put(Bytes.toBytes("100"));
put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman"));

List<Put> puts = Arrays.asList(put, put02, put03);
t.put(puts);
System.out.println("==========> 测试数据准备完成...");

if (admin != null) {
admin.close();
}
if (con != null) {
con.close();
}

} catch (IOException e) {
e.printStackTrace();
}

}
}


2.从hbase读取数据,经过处理后输到hdfs中

//从hbase表里面读取数据
//经过处理输到hdfs上
public class HbaseAndMapReduce02 {
public static void main(String[] args) throws Exception {
//完成的作业是: 有共同爱好的人
System.exit(run());
/*
* TableMapper //对输出hbase数据来格式分割的处理类
* TableReducer //对输入hbase数据来统计处理的处理类
* TableOutputFormat  //来自hbase的格式
*  TableInputFormat  //输入hbase的格式
*/
}

public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.52.140");

Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HbaseAndMapReduce02.class);

Scan scan = new Scan();
//取对业务有用的数据 tags, nickname
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

//数据来源 hbase
//TableInputFormat.addColumns(scan, columns);
//ImmutableBytesWritable来自hbase数据的类型
TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class,
Text.class,  Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outbasemapreduce" + new Date().getTime()));

job.setReducerClass(FindFriendReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}

public static class FindFriendMapper extends TableMapper<Text, Text>{
@Override
//key是hbase中的行键
//value是hbase中的所行键的所有数据
protected void map(
ImmutableBytesWritable key,
Result value,
Mapper<ImmutableBytesWritable, Result,Text, Text>.Context context)
throws IOException, InterruptedException {

Text v = null;
String[] kStrs = null;
List<Cell> cs = value.listCells();
for (Cell cell : cs) {
if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)).split(","));
kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
}
else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
v = new Text(CellUtil.cloneValue(cell));
System.out.println(v.toString());
}
}

for (String kStr : kStrs) {
context.write(new Text(kStr.toLowerCase()), v);
}

}
}

public static class FindFriendReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text text : values) {
System.out.println(text.toString());
sb.append((sb.length() > 0 ? ",":"") + text.toString());
}
context.write(key, new Text(sb.toString()));
}
}
}


3.从hdfs中读取数据,插入到hbase表中

//从hsfs文件中读取数据插入hbase中
public class HbaseAndMapReduce03 {
public static void main(String[] args) throws Exception {
System.exit(run());
}

public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.52.140");

Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HbaseAndMapReduce03.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(
"hdfs://192.168.52.140:9000/outbasemapreduce1463486843454"));
// 把数据写入Hbase数据库

TableMapReduceUtil.initTableReducerJob("friend",
FindFriendReducer.class, job);
checkTable(conf);
return job.waitForCompletion(true) ? 0 : 1;
}

private static void checkTable(Configuration conf) throws Exception {
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();
TableName tn = TableName.valueOf("friend");
if (!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor("person");
htd.addFamily(hcd);
admin.createTable(htd);
System.out.println("表不存在,新创建表成功....");
}
}

public static class FindFriendReducer extends
TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(
Text key,
Iterable<Text> values,
Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {

Put put = new Put(key.getBytes());
put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
values.iterator().next().getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}


4.从hbase表中读取数据,插入到另一个表中

//从一个表读取数据插入到另一个表中
public class HbaseAndMapReduce04 {
public static void main(String[] args) throws Exception {
System.exit(run());
}

public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.52.140");

Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HbaseAndMapReduce04.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class,job);
TableMapReduceUtil.initTableReducerJob("friend02", FindFriendReducer.class, job);

checkTable(conf);
return job.waitForCompletion(true) ? 0 : 1;
}

private static void checkTable(Configuration conf) throws Exception {
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();
TableName tn = TableName.valueOf("friend02");
if (!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor("person");
htd.addFamily(hcd);
admin.createTable(htd);
System.out.println("表不存在,新创建表成功....");
}
}

public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{
@Override

//key是hbase中的行键
//value是hbase中的所行键的所有数据
protected void map(
ImmutableBytesWritable key,
Result value,
Mapper<ImmutableBytesWritable, Result,ImmutableBytesWritable, ImmutableBytesWritable>.Context context)
throws IOException, InterruptedException {

ImmutableBytesWritable v = null;
String[] kStrs = null;
List<Cell> cs = value.listCells();
for (Cell cell : cs) {
if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
}
else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
v = new ImmutableBytesWritable(CellUtil.cloneValue(cell));
}
}
for (String kStr : kStrs) {
context.write(new ImmutableBytesWritable(Bytes.toBytes(kStr.toLowerCase())), v);
}

}
}

public static class FindFriendReducer extends
TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
protected void reduce(
ImmutableBytesWritable key,
Iterable<ImmutableBytesWritable> values,
Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
Put put = new Put(key.get());
StringBuilder vStr = new StringBuilder();
for (ImmutableBytesWritable value : values) {
vStr.append((vStr.length() > 0 ? ",":"") + Bytes.toString(value.get()));
}
put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"),Bytes.toBytes(vStr.toString()));
context.write(key, put);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: