自定义分区随机分配解决数据倾斜的问题
2018-01-19 16:32
549 查看
1、第一阶段有三个文本待统计(设置分区的个数为3)
package com.cr.skew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper extends Mapper<LongWritable,Text, Text,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split(" "); Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); for (String s : arr){ keyOut.set(s); valueOut.set(1); context.write(keyOut,valueOut); } } }
package com.cr.skew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SkewReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable iw : values){ count += iw.get(); } context.write(key,new IntWritable(count)); } }
package com.cr.skew; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp"); //设置job名称 job.setJarByClass(SkewApp.class); //设置搜索类 job.setInputFormatClass(TextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew"))); //设置输出路径 Path path = new Path("D:\\skew\\out"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper.class); //设置mapper类 job.setReducerClass(SkewReducer.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }输出part-r-00000
world3 3
part-r-00001
world1 3 world4 3part-r-00002
hello 15 world2 3 world5 3
2、第二阶段设置随机分区函数
package com.cr.skew1; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.Random; //自定义分区数 public class RandomPartition extends Partitioner<Text,IntWritable>{ @Override public int getPartition(Text text, IntWritable intWritable, int numPartitioners) { //生成0-numPartitioners的随机数 return new Random().nextInt(numPartitioners); } }
输出三个分区
hello 7 world1 2 world2 1 world3 1 world5 1
hello 4 world2 2 world3 2
hello 4 world1 1 world4 3 world5 2
3、对上面的reduce聚合进行再次mapper_reducer聚合
package com.cr.skew1_stage2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper2 extends Mapper<LongWritable,Text, Text,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); String line = value.toString(); String[] arr = line.split("\t"); context.wri c451 te(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1]))); } }
package com.cr.skew1_stage2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SkewReducer1 extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable iw : values){ count += iw.get(); } context.write(key,new IntWritable(count)); } }
package com.cr.skew1_stage2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp2"); //设置job名称 job.setJarByClass(SkewApp2.class); //设置搜索类 job.setInputFormatClass(TextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002"))); //设置输出路径 Path path = new Path("D:\\skew\\out2"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper2.class); //设置mapper类 job.setReducerClass(SkewReducer1.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }
world3 3
world1 3 world4 3
hello 15 world2 3 world5 3可以看到这里的结果和上面没有使用分区函数的结果是一样的
4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma
就可以直接将第一阶段的输出作为key-value,而不用进行切割了package com.cr.skew1_stage_version2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SkewApp2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //单例作业 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5"); //设置job的各种属性 job.setJobName("SkewApp2"); //设置job名称 job.setJarByClass(SkewApp2.class); //设置搜索类 job.setInputFormatClass(KeyValueTextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001"))); FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002"))); //设置输出路径 Path path = new Path("D:\\skew\\out2"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.setMapperClass(SkewMapper2.class); //设置mapper类 job.setReducerClass(SkewReducer1.class); //设置reduecer类 job.setMapOutputKeyClass(Text.class); //设置之map输出key job.setMapOutputValueClass(IntWritable.class); //设置map输出value job.setOutputKeyClass(Text.class); //设置mapreduce 输出key job.setOutputValueClass(IntWritable.class); //设置mapreduce输出value job.setNumReduceTasks(3); job.waitForCompletion(true); } }查看源码可知
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> { public KeyValueTextInputFormat() { }这里的mapper输入为<text,text>类型
package com.cr.skew1_stage_version2; import org.apache.commons.httpclient.methods.multipart.Part; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SkewMapper2 extends Mapper<Text,Text, Text,IntWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { System.out.println("进入mapper"); context.write(key,new IntWritable(Integer.parseInt(value.toString()))); } }这里的reducer不变发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事
相关文章推荐
- 大规模数据相似度计算时,解决数据倾斜的问题的思路之一(分块思想)
- dede5.31 采集导出数据顺序问题 和 默认随机推荐数修改. - 内附解决办法
- 解决Zabbix自定义用户参数无法获取到数据的问题
- dedecms自定义表单数据校验不对,程序返回问题解决
- hive使用技巧(四)——巧用MapJoin解决数据倾斜问题
- 2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题
- Java NIO 实现进程通讯,解决用户自定义数据的组包和拆分粘包的问题
- 专访周金可:我们更倾向于Greenplum来解决数据倾斜的问题
- android retrofit 实战自定义converter,解决相同接口返回不同数据的问题
- RecycleView的Item数据出现随机错乱重复问题解决
- android retrofit 实战自定义converter,解决相同接口返回不同数据的问题
- android retrofit 实战自定义converter,解决相同接口返回不同数据的问题
- 解决spark中遇到的数据倾斜问题
- 解决数据倾斜问题
- 解决导入数据时提示分区数据不能导入问题
- Hive数据倾斜问题解决方案
- QT 自定义信号与信号槽方法及解决自定义数据类型或数组作为函数参数的问题
- 第137课: Spark面试经典系列之数据倾斜解决之Map 端Reduce及问题思考
- 获取DataGridView绑定BindingSource的数据表,并根据树控件点击进行筛选数据表,下拉控件自定义排序问题初步解决方法,解决了该行属于另一个表的错误提示.
- 大规模数据相似度计算时,解决数据倾斜的问题的思路之一(分块思想)