您的位置:首页 > 其它

自定义分区随机分配解决数据倾斜的问题

2018-01-19 16:32 549 查看

1、第一阶段有三个文本待统计(设置分区的个数为3)

package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper extends Mapper<LongWritable,Text, Text,IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("进入mapper");
String line = value.toString();
String[] arr = line.split(" ");
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();

for (String s : arr){
keyOut.set(s);
valueOut.set(1);
context.write(keyOut,valueOut);
}

}
}

package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable iw : values){
count += iw.get();
}
context.write(key,new IntWritable(count));
}
}

package com.cr.skew;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//单例作业
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

//设置job的各种属性
job.setJobName("SkewApp");                 //设置job名称
job.setJarByClass(SkewApp.class);              //设置搜索类
job.setInputFormatClass(TextInputFormat.class);

//设置输入路径
FileInputFormat.addInputPath(job,new Path(("D:\\skew")));
//设置输出路径
Path path = new Path("D:\\skew\\out");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);

job.setMapperClass(SkewMapper.class);               //设置mapper类
job.setReducerClass(SkewReducer.class);               //设置reduecer类

job.setMapOutputKeyClass(Text.class);            //设置之map输出key
job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

job.setNumReduceTasks(3);
job.waitForCompletion(true);

}

}
输出part-r-00000

world3	3

part-r-00001

world1	3
world4	3
part-r-00002
hello	15
world2	3
world5	3

2、第二阶段设置随机分区函数

package com.cr.skew1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Random;

//自定义分区数
public class RandomPartition extends Partitioner<Text,IntWritable>{
@Override
public int getPartition(Text text, IntWritable intWritable, int numPartitioners) {
//生成0-numPartitioners的随机数
return new Random().nextInt(numPartitioners);
}
}

输出三个分区
hello	7
world1	2
world2	1
world3	1
world5	1
hello	4
world2	2
world3	2
hello	4
world1	1
world4	3
world5	2

3、对上面的reduce聚合进行再次mapper_reducer聚合

package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper<LongWritable,Text, Text,IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("进入mapper");
String line = value.toString();
String[] arr = line.split("\t");

context.wri
c451
te(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1])));

}
}
package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer1 extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable iw : values){
count += iw.get();
}
context.write(key,new IntWritable(count));
}
}
package com.cr.skew1_stage2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//单例作业
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

//设置job的各种属性
job.setJobName("SkewApp2");                 //设置job名称
job.setJarByClass(SkewApp2.class);              //设置搜索类
job.setInputFormatClass(TextInputFormat.class);

//设置输入路径
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000")));
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001")));
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002")));
//设置输出路径
Path path = new Path("D:\\skew\\out2");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);

job.setMapperClass(SkewMapper2.class);               //设置mapper类
job.setReducerClass(SkewReducer1.class);               //设置reduecer类

job.setMapOutputKeyClass(Text.class);            //设置之map输出key
job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

job.setNumReduceTasks(3);
job.waitForCompletion(true);

}

}
world3	3
world1	3
world4	3
hello	15
world2	3
world5	3
可以看到这里的结果和上面没有使用分区函数的结果是一样的

4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma

就可以直接将第一阶段的输出作为key-value,而不用进行切割了
package com.cr.skew1_stage_version2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//单例作业
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

//设置job的各种属性
job.setJobName("SkewApp2");                 //设置job名称
job.setJarByClass(SkewApp2.class);              //设置搜索类
job.setInputFormatClass(KeyValueTextInputFormat.class);

//设置输入路径
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000")));
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001")));
FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002")));
//设置输出路径
Path path = new Path("D:\\skew\\out2");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);

job.setMapperClass(SkewMapper2.class);               //设置mapper类
job.setReducerClass(SkewReducer1.class);               //设置reduecer类

job.setMapOutputKeyClass(Text.class);            //设置之map输出key
job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

job.setNumReduceTasks(3);
job.waitForCompletion(true);

}

}
查看源码可知
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
public KeyValueTextInputFormat() {
}
这里的mapper输入为<text,text>类型
package com.cr.skew1_stage_version2;

import org.apache.commons.httpclient.methods.multipart.Part;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper<Text,Text, Text,IntWritable> {

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("进入mapper");

context.write(key,new IntWritable(Integer.parseInt(value.toString())));

}
}
这里的reducer不变发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐