MapReduce结合hbase探查二度人脉
2015-11-11 10:43
225 查看
二度人脉,通俗一点讲,好友的好友,即,查好友表,把好友找出来,再查好友表,把好友的好友找出来。
小狼是先在mysql中创表,再讲mysql中的表导入到hbase集群,这是friend表的大概表结构
这是betweens表的表结构,hbase中我把所有的数据都写在列族下,并没有取列名
我的job服务是这样的
下面是map类
下面是reducer类
mapreduce中是将hbase中以行健为分割符一行一行的运行map方法,然后map都运行结束后,完成reducer方法的运行,reduer是以map方法中context.write(key.vvalue);中key为分隔符一行一行的运行
小狼是先在mysql中创表,再讲mysql中的表导入到hbase集群,这是friend表的大概表结构
这是betweens表的表结构,hbase中我把所有的数据都写在列族下,并没有取列名
我的job服务是这样的
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; public class Test { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //配置信息 Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "slave1,slave2"); conf.set("hbase.master", "master:600000"); //打开一个job Job job = new Job(conf, "HBase_FindFriend"); job.setJarByClass(Test.class); //扫描 friend表fid列族 uid列族 Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("fid")); scan.addFamily(Bytes.toBytes("uid")); //注册信息 map TableMapReduceUtil.initTableMapperJob("friend", scan, TestMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); //注册信息reducer TableMapReduceUtil.initTableReducerJob("betweens", TestReducer.class, job); //跳出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
下面是map类
import java.io.IOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; /** * @author 小狼 * 继承 * TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> * 完成 对hbase的读操作 */ public class TestMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{ //常量 作为key的值传入map public static ImmutableBytesWritable writable=new ImmutableBytesWritable(Bytes.toBytes("1")); /** * ImmutableBytesWritable key 行健的值 * Result values 对应行健的列的值 * org.apache.hadoop.mapreduce.Mapper.Context context 上下文对象 神奇话的map */ @Override protected void map(ImmutableBytesWritable key, Result values, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { // String fid=""; String uid=""; //遍历一个行健上对应的所有列的值 for (KeyValue kv : values.list()) { //kv.getFamily()获取列族 if ("fid".equals(Bytes.toString(kv.getFamily()))) { //kv.getValue()获取值 fid=Bytes.toString(kv.getValue()); } if("uid".equals(Bytes.toString(kv.getFamily()))){ uid=Bytes.toString(kv.getValue()); } } if(!"".equalsIgnoreCase(fid)&&!"".equalsIgnoreCase(uid)){ //fid uid不为空则把这两个变成一个字符串,中间加一个符号隔离, //并将其做为值,传入map集合 String value=uid+">"+fid; ImmutableBytesWritable immutableBytesWritableValue=new ImmutableBytesWritable(Bytes.toBytes(value.toLowerCase())); //一个神奇话的map集合 //reducer获取的就是一个为1的行健和整个friend表的uid fid值 context.write(writable, immutableBytesWritableValue); } } }
下面是reducer类
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; /** * * @author 小狼 * TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> * 继承这个类,完成reducer操作 */ public class TestReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable>{ @Override protected void reduce(ImmutableBytesWritable key,Iterable<ImmutableBytesWritable> values,Context context) throws IOException, InterruptedException { //uid fid是一对一的 List<String> uids=new ArrayList<String>(); List<String> fids=new ArrayList<String>(); //去重 Map<String, String> maps=new HashMap<String, String>(); //获取hbase friend表里面所有的uid fid 并塞入map集合key中,去重 for(ImmutableBytesWritable value:values){ String val=Bytes.toString(value.get()); maps.put(val, ""); } //获得的val格式是'1>2' for(String val:maps.keySet()){ String[] valStr=val.split(">"); uids.add(valStr[0]); fids.add(valStr[1]); } //计算 //行健 int g=1; //遍历获取uidCo fidCo for(int i =0;i<uids.size();i++){ String uidCo=uids.get(i); String fidCo=fids.get(i); //根据fidCo比较uidCoCo若一样,则对应的fidCoCo就是uidCo的二度人脉 for(int y=0;y<uids.size();y++){ String uidCoCo=uids.get(y); if(uidCoCo.equalsIgnoreCase(fidCo)&&(y!=i)){ String fidCoCo=fids.get(y); //将计算的值放入put集合 Put put=new Put(Bytes.toBytes((g+"").toLowerCase())); put.add(Bytes.toBytes("bid"),Bytes.toBytes(""),Bytes.toBytes((g+"").toLowerCase())); put.add(Bytes.toBytes("uid"),Bytes.toBytes(""),Bytes.toBytes(uidCo)); put.add(Bytes.toBytes("fid"),Bytes.toBytes(""),Bytes.toBytes(fidCo)); put.add(Bytes.toBytes("efid"),Bytes.toBytes(""),Bytes.toBytes(fidCoCo)); //塞入hbase betweens表中 context.write(new ImmutableBytesWritable(Bytes.toBytes((g+"").toLowerCase())), put); g++; } } } } }
mapreduce中是将hbase中以行健为分割符一行一行的运行map方法,然后map都运行结束后,完成reducer方法的运行,reduer是以map方法中context.write(key.vvalue);中key为分隔符一行一行的运行
相关文章推荐
- 个人工作中ssd、audio python脚本总结
- javascript 常见问题
- iis安装与FTP安装与配置与php安装apache
- git的基本用法
- 从Hello World 程序开始
- redis配置文件详解
- IBInspectable / IBDesignable
- Android快速SDK(12)文本pdf阅读器PdfViewer【傻瓜模式】
- js/uploadifive html图片批量上传插件
- 欢迎使用CSDN-markdown编辑器
- frames的对象兼容性获取以及跨域实现数据交换(js文件的加载判断)
- 动态请求页面生成静态页
- LoadRunner 技巧之THML 与 URL两种录制模式分析
- jvm实例,tomcat容器,spring容器,在内存中的关系
- 神、上帝以及老天爷
- linux内核编译步骤
- 第一章:Dubbo整合maven+spring+springmvc+mybatis之环境准备
- 阿牛的EOF牛肉串
- 数据挖掘算法基础-关联规则
- muduo库阅读(23)——Net部分:应用层缓冲区类