您的位置:首页 > 其它

MapReduce结合hbase探查二度人脉

2015-11-11 10:43 225 查看
二度人脉,通俗一点讲,好友的好友,即,查好友表,把好友找出来,再查好友表,把好友的好友找出来。

小狼是先在mysql中创表,再讲mysql中的表导入到hbase集群,这是friend表的大概表结构



这是betweens表的表结构,hbase中我把所有的数据都写在列族下,并没有取列名



我的job服务是这样的

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;

public class Test {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//配置信息
Configuration conf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.zookeeper.quorum", "slave1,slave2");
conf.set("hbase.master", "master:600000");
//打开一个job
Job job = new Job(conf, "HBase_FindFriend");
job.setJarByClass(Test.class);
//扫描  friend表fid列族  uid列族
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("fid"));
scan.addFamily(Bytes.toBytes("uid"));

//注册信息    map
TableMapReduceUtil.initTableMapperJob("friend", scan,
TestMapper.class, ImmutableBytesWritable.class,
ImmutableBytesWritable.class, job);
//注册信息reducer
TableMapReduceUtil.initTableReducerJob("betweens",
TestReducer.class, job);
//跳出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


下面是map类

import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @author 小狼
* 继承
* TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>
* 完成 对hbase的读操作
*/
public class TestMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{
//常量 作为key的值传入map
public static ImmutableBytesWritable writable=new ImmutableBytesWritable(Bytes.toBytes("1"));
/**
* ImmutableBytesWritable key  行健的值
* Result values  对应行健的列的值
* org.apache.hadoop.mapreduce.Mapper.Context context 上下文对象    神奇话的map
*/
@Override
protected void map(ImmutableBytesWritable key, Result values,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {

//
String fid="";
String uid="";
//遍历一个行健上对应的所有列的值
for (KeyValue kv : values.list()) {
//kv.getFamily()获取列族
if ("fid".equals(Bytes.toString(kv.getFamily())))
{
//kv.getValue()获取值
fid=Bytes.toString(kv.getValue());
}
if("uid".equals(Bytes.toString(kv.getFamily()))){

uid=Bytes.toString(kv.getValue());
}
}

if(!"".equalsIgnoreCase(fid)&&!"".equalsIgnoreCase(uid)){
//fid  uid不为空则把这两个变成一个字符串,中间加一个符号隔离,
//并将其做为值,传入map集合
String value=uid+">"+fid;
ImmutableBytesWritable immutableBytesWritableValue=new ImmutableBytesWritable(Bytes.toBytes(value.toLowerCase()));

//一个神奇话的map集合
//reducer获取的就是一个为1的行健和整个friend表的uid  fid值
context.write(writable, immutableBytesWritableValue);
}
}

}


下面是reducer类

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
/**
*
* @author 小狼
* TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable>
* 继承这个类,完成reducer操作
*/
public class TestReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable>{

@Override
protected void reduce(ImmutableBytesWritable key,Iterable<ImmutableBytesWritable> values,Context context)
throws IOException, InterruptedException {
//uid fid是一对一的
List<String> uids=new ArrayList<String>();
List<String> fids=new ArrayList<String>();
//去重
Map<String, String> maps=new HashMap<String, String>();
//获取hbase  friend表里面所有的uid  fid  并塞入map集合key中,去重
for(ImmutableBytesWritable value:values){
String val=Bytes.toString(value.get());
maps.put(val, "");
}
//获得的val格式是'1>2'
for(String val:maps.keySet()){
String[] valStr=val.split(">");
uids.add(valStr[0]);
fids.add(valStr[1]);
}
//计算
//行健
int g=1;
//遍历获取uidCo  fidCo
for(int i =0;i<uids.size();i++){
String uidCo=uids.get(i);
String fidCo=fids.get(i);
//根据fidCo比较uidCoCo若一样,则对应的fidCoCo就是uidCo的二度人脉
for(int y=0;y<uids.size();y++){
String uidCoCo=uids.get(y);

if(uidCoCo.equalsIgnoreCase(fidCo)&&(y!=i)){
String fidCoCo=fids.get(y);

//将计算的值放入put集合
Put put=new Put(Bytes.toBytes((g+"").toLowerCase()));
put.add(Bytes.toBytes("bid"),Bytes.toBytes(""),Bytes.toBytes((g+"").toLowerCase()));
put.add(Bytes.toBytes("uid"),Bytes.toBytes(""),Bytes.toBytes(uidCo));
put.add(Bytes.toBytes("fid"),Bytes.toBytes(""),Bytes.toBytes(fidCo));
put.add(Bytes.toBytes("efid"),Bytes.toBytes(""),Bytes.toBytes(fidCoCo));
//塞入hbase  betweens表中
context.write(new ImmutableBytesWritable(Bytes.toBytes((g+"").toLowerCase())), put);
g++;
}
}
}

}
}


mapreduce中是将hbase中以行健为分割符一行一行的运行map方法,然后map都运行结束后,完成reducer方法的运行,reduer是以map方法中context.write(key.vvalue);中key为分隔符一行一行的运行
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: