您的位置:首页 > 大数据 > Hadoop

Hadoop案例(三)找博客共同好友

2018-06-01 23:42 323 查看

找博客共同好友案例

1)数据准备

以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A-B    E C
A-C    D F
A-D    E F
A-E    D B C
A-F    O B C D E
A-G    F E C D
A-H    E C D O
A-I    O
A-J    O B
A-K    D C
A-L    F E D
A-M    E F
B-C    A
B-D    A E
B-E    C
B-F    E A C
B-G    C E A
B-H    A E C
B-I    A
B-K    C A
B-L    E
B-M    E
B-O    A
C-D    A F
C-E    D
C-F    D A
C-G    D F A
C-H    D A
C-I    A
C-K    A D
C-L    D F
C-M    F
C-O    I A
D-E    L
D-F    A E
D-G    E A F
D-H    A E
D-I    A
D-K    A
D-L    E F
D-M    F E
D-O    A
E-F    D M C B
E-G    C D
E-H    C D
E-J    B
E-K    C D
E-L    D
F-G    D C A E
F-H    A D O E C
F-I    O A
F-J    B O
F-K    D C A
F-L    E D
F-M    E
F-O    A
G-H    D C E A
G-I    A
G-K    D A C
G-L    D F E
G-M    E F
G-O    A
H-I    O A
H-J    O
H-K    A C D
H-L    D E
H-M    E
H-O    A
I-J    O
I-K    A
I-O    A
K-L    D
K-O    A
L-M    E F
View Code

3)代码实现

(1)第一次Mapper 

package com.xyg.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 1 获取一行 A:B,C,D,F,E,O
String line = value.toString();

// 2 切割
String[] fileds = line.split(":");

// 3 获取person和好友
String person = fileds[0];
String[] friends = fileds[1].split(",");

// 4写出去
for(String friend: friends){
// 输出 <好友,人>
context.write(new Text(friend), new Text(person));
}
}
}

(2)第一次Reducer 

package com.xyg.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();
//1 拼接
for(Text person: values){
sb.append(person).append(",");
}

//2 写出
context.write(key, new Text(sb.toString()));
}
}

(3)第一次Driver 

package com.xyg.mapreduce.friends;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneShareFriendsDriver {

public static void main(String[] args) throws Exception {
// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径
job.setJarByClass(OneShareFriendsDriver.class);

// 3 指定map/reduce使用的类
job.setMapperClass(OneShareFriendsMapper.class);
job.setReducerClass(OneShareFriendsReducer.class);

// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交
boolean result = job.waitForCompletion(true);

System.exit(result?1:0);
}
}

(4)第二次Mapper 

package com.xyg.mapreduce.friends;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// A I,K,C,B,G,F,H,O,D,
// 友 人,人,人
String line = value.toString();
String[] friend_persons = line.split("\t");

String friend = friend_persons[0];
String[] persons = friend_persons[1].split(",");

Arrays.sort(persons);

for (int i = 0; i < persons.length - 1; i++) {

for (int j = i + 1; j < persons.length; j++) {
// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去
context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
}
}
}
}

(5)第二次Reducer 

package com.xyg.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();

for (Text friend : values) {
sb.append(friend).append(" ");
}

context.write(key, new Text(sb.toString()));
}
}

(6)第二次Driver 

package com.xyg.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoShareFriendsDriver {

public static void main(String[] args) throws Exception {
// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径
job.setJarByClass(TwoShareFriendsDriver.class);

// 3 指定map/reduce使用的类
job.setMapperClass(TwoShareFriendsMapper.class);
job.setReducerClass(TwoShareFriendsReducer.class);

// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交
boolean result = job.waitForCompletion(true);

System.exit(result?1:0);
}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: