您的位置:首页 > 运维架构

Hadoop demo 找出共同好友

2017-06-21 16:59 357 查看

需求

以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

 

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

第一步  

map

读一行   A:B,C,D,F,E,O

输出    <B,A><C,A><D,A><F,A><E,A><O,A>

在读一行   B:A,C,E,K

输出   <A,B><C,B><E,B><K,B>

 

 

REDUCE

拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

输出:  

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>.....

 

 

 

第二步

map

读入一行<A-B,C>

直接输出<A-B,C>

 

reduce

读入数据  <A-B,C><A-B,F><A-B,G>.......

输出: A-B  C,F,G,.....

第一步

package com.asin.hdp.commfriend;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CommFriendDemo {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommFriendDemo.class);

job.setMapperClass(CommFriendMapper.class);
job.setReducerClass(CommFriendReduce.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path("F:/friend.txt"));
FileOutputFormat.setOutputPath(job, new Path("F:/outputFriend1"));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

class CommFriendMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(":");
String user = split[0];
String[] friends = split[1].split(",");
for (String friend : friends) {
context.write(new Text(friend), new Text(user));
}
}
}

class CommFriendReduce extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

String users = "";
for (Text text : value) {
users += text + ",";
}
context.write(key, new Text(users));
}
}

部分结果

A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,

第二步

package com.asin.hdp.commfriend;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.IterableUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CommFriendDemo2 {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommFriendDemo2.class);

job.setMapperClass(CommFriendMapperS.class);
job.setReducerClass(CommFriendReduceS.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path("F:/outputFriend1/part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("F:/outputFriend2"));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

class CommFriendMapperS extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
String friend = split[0];
String users = split[1];
String[] userArr = users.split(",");
Arrays.sort(userArr);
for (int i = 0; i < userArr.length - 2; i++) {
for (int j = i + 1; j < userArr.length - 1; j++) {
String user_user = userArr[i] + "-" + userArr[j];
context.write(new Text(user_user), new Text(friend));
}
}
}
}

class CommFriendReduceS extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String user = "";
for (Text text : value) {
user += text + ",";
}
context.write(key, new Text(user));
}
}

部分结果

A-B C,E,
A-C F,D,
A-D E,F,
A-E B,C,D,
A-F C,D,B,E,O,
A-G D,E,F,C,
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Hadoop