HBase新版本与MapReduce集成
2016-05-17 21:54
344 查看
1.MapReduce从hbase读取数据
2.从hbase读取数据,经过处理后输到hdfs中
3.从hdfs中读取数据,插入到hbase表中
4.从hbase表中读取数据,插入到另一个表中
//读取hbase表数据 public class HbaseAndMapReduce { public static void main(String[] args) throws Exception { // 测试数据 // testData(); // 完成的作业是: 有共同爱好的人 System.exit(run()); /* * TableMapper //对输出hbase数据来格式分割的处理类 TableReducer //对输入hbase数据来统计处理的处理类 * TableOutputFormat //来自hbase的格式 TableInputFormat //输入hbase的格式 */ } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.52.140"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HbaseAndMapReduce.class); Scan scan = new Scan(); // 取对业务有用的数据 tags, nickname scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags")); scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname")); // 数据来源 hbase // TableInputFormat.addColumns(scan, columns); // ImmutableBytesWritable来自hbase数据的类型 TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, ImmutableBytesWritable.class, Result.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outhbase" + new Date().getTime())); return job.waitForCompletion(true) ? 0 : 1; } public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, Result> { @Override // key是hbase中的行键 // value是hbase中的所行键的所有数据 protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context) throws IOException, InterruptedException { System.out.println("key :: " + Bytes.toString(key.get())); List<Cell> cs = value.listCells(); System.out.print("value :: "); for (Cell cell : cs) { String rowKey = Bytes.toString(CellUtil.cloneRow(cell)); long timestamp = cell.getTimestamp(); String family = Bytes.toString(CellUtil.cloneFamily(cell)); String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); String val = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("RowKey=" + rowKey + ", Timestamp=" + timestamp + ", Family=" + family + ", Qualifier=" + qualifier + ", Val=" + val); } super.map(key, value, context); } } public static void testData() { try { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.52.140"); Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("blog"); if (admin.tableExists(tn)) { admin.disableTable(tn); admin.deleteTable(tn); } HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd01 = new HColumnDescriptor("article"); htd.addFamily(hcd01); HColumnDescriptor hcd02 = new HColumnDescriptor("author"); htd.addFamily(hcd02); admin.createTable(htd); Table t = con.getTable(tn); Put put = new Put(Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"), Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, " + "realtime read/write access to your Big Data")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop")); put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase")); put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan")); put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan")); Put put02 = new Put(Bytes.toBytes("10")); put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop")); put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi")); Put put03 = new Put(Bytes.toBytes("100")); put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql")); put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman")); List<Put> puts = Arrays.asList(put, put02, put03); t.put(puts); System.out.println("==========> 测试数据准备完成..."); if (admin != null) { admin.close(); } if (con != null) { con.close(); } } catch (IOException e) { e.printStackTrace(); } } }
2.从hbase读取数据,经过处理后输到hdfs中
//从hbase表里面读取数据 //经过处理输到hdfs上 public class HbaseAndMapReduce02 { public static void main(String[] args) throws Exception { //完成的作业是: 有共同爱好的人 System.exit(run()); /* * TableMapper //对输出hbase数据来格式分割的处理类 * TableReducer //对输入hbase数据来统计处理的处理类 * TableOutputFormat //来自hbase的格式 * TableInputFormat //输入hbase的格式 */ } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.52.140"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HbaseAndMapReduce02.class); Scan scan = new Scan(); //取对业务有用的数据 tags, nickname scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags")); scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname")); //数据来源 hbase //TableInputFormat.addColumns(scan, columns); //ImmutableBytesWritable来自hbase数据的类型 TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outbasemapreduce" + new Date().getTime())); job.setReducerClass(FindFriendReducer.class); return job.waitForCompletion(true) ? 0 : 1; } public static class FindFriendMapper extends TableMapper<Text, Text>{ @Override //key是hbase中的行键 //value是hbase中的所行键的所有数据 protected void map( ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result,Text, Text>.Context context) throws IOException, InterruptedException { Text v = null; String[] kStrs = null; List<Cell> cs = value.listCells(); for (Cell cell : cs) { if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ System.out.println(Bytes.toString(CellUtil.cloneValue(cell)).split(",")); kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(","); } else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ v = new Text(CellUtil.cloneValue(cell)); System.out.println(v.toString()); } } for (String kStr : kStrs) { context.write(new Text(kStr.toLowerCase()), v); } } } public static class FindFriendReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text text : values) { System.out.println(text.toString()); sb.append((sb.length() > 0 ? ",":"") + text.toString()); } context.write(key, new Text(sb.toString())); } } }
3.从hdfs中读取数据,插入到hbase表中
//从hsfs文件中读取数据插入hbase中 public class HbaseAndMapReduce03 { public static void main(String[] args) throws Exception { System.exit(run()); } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.52.140"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HbaseAndMapReduce03.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path( "hdfs://192.168.52.140:9000/outbasemapreduce1463486843454")); // 把数据写入Hbase数据库 TableMapReduceUtil.initTableReducerJob("friend", FindFriendReducer.class, job); checkTable(conf); return job.waitForCompletion(true) ? 0 : 1; } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("friend"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("person"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新创建表成功...."); } } public static class FindFriendReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce( Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Put put = new Put(key.getBytes()); put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nicknames"), values.iterator().next().getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } }
4.从hbase表中读取数据,插入到另一个表中
//从一个表读取数据插入到另一个表中 public class HbaseAndMapReduce04 { public static void main(String[] args) throws Exception { System.exit(run()); } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.52.140"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HbaseAndMapReduce04.class); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags")); scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname")); TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class,job); TableMapReduceUtil.initTableReducerJob("friend02", FindFriendReducer.class, job); checkTable(conf); return job.waitForCompletion(true) ? 0 : 1; } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("friend02"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("person"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新创建表成功...."); } } public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{ @Override //key是hbase中的行键 //value是hbase中的所行键的所有数据 protected void map( ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result,ImmutableBytesWritable, ImmutableBytesWritable>.Context context) throws IOException, InterruptedException { ImmutableBytesWritable v = null; String[] kStrs = null; List<Cell> cs = value.listCells(); for (Cell cell : cs) { if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(","); } else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ v = new ImmutableBytesWritable(CellUtil.cloneValue(cell)); } } for (String kStr : kStrs) { context.write(new ImmutableBytesWritable(Bytes.toBytes(kStr.toLowerCase())), v); } } } public static class FindFriendReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { @Override protected void reduce( ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Put put = new Put(key.get()); StringBuilder vStr = new StringBuilder(); for (ImmutableBytesWritable value : values) { vStr.append((vStr.length() > 0 ? ",":"") + Bytes.toString(value.get())); } put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"),Bytes.toBytes(vStr.toString())); context.write(key, put); } } }