hadoop join数据倾斜解决方法
2016-12-01 11:08
393 查看
注意点:
1: map输出的一定是两表的外键
2:构造的信息bean要有一个标志位,用来判别现在的bean中的信息是属于哪个表的
下面是实现代码已运行通过
这个bean包含了我们要输出的全部信息外加了一个标志位用来说明
mr程序
在上诉的例子中:
当一个商品卖的特别火的时候,他的order表将很多项,这时候通过pid的hashcode决定reducer的时候,就会出现数据倾斜。就是一个reducer处理很多,而其他的很少
解决方法:
使用 map side join(不需要reducer,只使用map)
将产品信息表(比order要小很多很多)放入一个distributedcache中,
然而hadoop已经为我们实现了distributedcache
所以代码改动如下:
1:新建一个product类
2:map端:
3:job端
1: map输出的一定是两表的外键
2:构造的信息bean要有一个标志位,用来判别现在的bean中的信息是属于哪个表的
下面是实现代码已运行通过
package join; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class InfoBean implements Writable { public InfoBean() { } public void set(String pid, String data, String price, String amount, String productName, String orderId, String categoryId,String flag) { this.pid = pid; this.data = data; this.price = price; this.amount = amount; this.productName = productName; this.orderId = orderId; this.categoryId = categoryId; this.flag = flag; } private String pid; private String data; private String price; private String amount; private String productName; private String orderId; private String categoryId; //0 order 1 product private String flag; @Override public void write(DataOutput out) throws IOException { out.writeUTF(pid); out.writeUTF(data); out.writeUTF(price); out.writeUTF(amount); out.writeUTF(productName); out.writeUTF(orderId); out.writeUTF(categoryId); out.writeUTF(flag); } @Override public void readFields(DataInput input) throws IOException { this.pid = input.readUTF(); this.data = input.readUTF(); this.price = input.readUTF(); this.amount = input.readUTF(); this.productName = input.readUTF(); this.orderId = input.readUTF(); this.categoryId = input.readUTF(); this.flag = input.readUTF(); } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public String getData() { return data; } public void setData(String data) { this.data = data; } public String getPrice() { return price; } public void setPrice(String price) { this.price = price; } public String getAmount() { return amount; } public void setAmount(String amount) { this.amount = amount; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getCategoryId() { return categoryId; } public void setCategoryId(String categoryId) { this.categoryId = categoryId; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return "InfoBean{" + "pid='" + pid + '\'' + ", data='" + data + '\'' + ", price='" + price + '\'' + ", amount='" + amount + '\'' + ", productName='" + productName + '\'' + ", orderId='" + orderId + '\'' + ", categoryId='" + categoryId + '\'' + ", flag='" + flag + '\'' + '}'; } }
这个bean包含了我们要输出的全部信息外加了一个标志位用来说明
mr程序
package join; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class DataJoin { //map输出时key为联系表的外键 static class JoinMapper extends Mapper<LongWritable,Text,Text,InfoBean> { @Override protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fileds = line.split(","); InfoBean info = new InfoBean(); Text key = new Text(); //InfoBean为合体bean,所以先判断map进来的是哪种bean FileSplit split = (FileSplit) context.getInputSplit(); String filename = split.getPath().getName(); if (filename.startsWith("order")){ info.set(fileds[2],fileds[1],"",fileds[3],"",fileds[0],"","0"); }else { info.set(fileds[0],"",fileds[3],"",fileds[1],"",fileds[2],"1"); } String pid = info.getPid(); key.set(pid); context.write(key,info); } } static class JoinReducer extends Reducer<Text,InfoBean,InfoBean,NullWritable>{ @Override protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException { List<InfoBean> infoBeans = new ArrayList<>(); InfoBean productBean = new InfoBean(); for (InfoBean value:values){ String flag = value.getFlag(); if ("0".equals(flag)){ InfoBean bean = new InfoBean(); try { BeanUtils.copyProperties(bean,value); infoBeans.add(bean); } catch (Exception e) { e.printStackTrace(); } }else { try { BeanUtils.copyProperties(productBean,value); } catch (Exception e) { e.printStackTrace(); } } } infoBeans.forEach( infoBean -> { infoBean.setProductName(productBean.getProductName()); infoBean.setCategoryId(productBean.getCategoryId()); infoBean.setPrice(productBean.getPrice()); try { context.write(infoBean,NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } ); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); //本地调试模式 conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(); //指定本程序的jar包所在的本地路径 job.setJarByClass(DataJoin.class); //指定运行的map程序 job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); //指定map输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); //最终输出数据类型kv job.setOutputKeyClass(InfoBean.class); job.setOutputValueClass(NullWritable.class); //指定job的输入原始文件所在目录 // FileInputFormat.setInputPaths(job,new Path("/wordcount/input")); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); Boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
在上诉的例子中:
当一个商品卖的特别火的时候,他的order表将很多项,这时候通过pid的hashcode决定reducer的时候,就会出现数据倾斜。就是一个reducer处理很多,而其他的很少
解决方法:
使用 map side join(不需要reducer,只使用map)
将产品信息表(比order要小很多很多)放入一个distributedcache中,
然而hadoop已经为我们实现了distributedcache
所以代码改动如下:
1:新建一个product类
public class Product { private String pid; private String data; private String price;
2:map端:
//map输出时key为联系表的外键 static class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> { Product p = new Product(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File("product.txt")))); String line = br.readLine(); if (StringUtils.isNotEmpty(line)){ String[] fileds = line.split(","); p.set(fileds[0],fileds[3],fileds[2],fileds[1]); } br.close(); } @Override protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fileds = line.split(","); InfoBean info = new InfoBean(); Text key = new Text(); info.set(fileds[2],fileds[1],p.getPrice(),fileds[3],p.getProductName(),fileds[0],p.getCategoryId(),"0"); String pid = info.getPid(); key.set(pid + p.getProductName() + p.getPrice()); context.write(key,NullWritable.get()); } }
3:job端
Configuration conf = new Configuration(); //本地调试模式 conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); // 本地提交模式 hdfs在线 // conf.set("mapreduce.framework.name","local"); // conf.set("fs.defaultFS","hdfs://master:9000"); Job job = Job.getInstance(); //线上 job.setJarByClass(DataJoin.class); //调试模式 // job.setJar("/home/willian/Desktop/project/java/hadoop/out/jar/word.jar"); //指定运行的map程序 job.setMapperClass(JoinMapper.class); // job.setReducerClass(JoinReducer.class); // //指定map输出数据的kv类型 // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(InfoBean.class); //最终输出数据类型kv job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); // // job.addArchiveToClassPath(); //缓存jar包到task节点的classpath // job.addFileToClassPath(); //缓存普通文件到task节点的classpath // job.addCacheFile(); //缓存普通文件到task节点的工作目录中 job.addCacheFile(new URI("file:///home/willian/Desktop/project/java/hadoop/product.txt"));//或hdfs文件路径 //指定job的输入原始文件所在目录 // FileInputFormat.setInputPaths(job,new Path("/wordcount/input")); FileInputFormat.setInputPaths(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/order.txt")); FileOutputFormat.setOutputPath(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/out")); Boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1);
相关文章推荐
- hive数据倾斜解决方法
- Hadoop文件放置策略及数据倾斜的balance方法
- Hive数据倾斜解决方法总结
- hive数据倾斜原因和解决方法
- hadoop job解决大数据量关联时数据倾斜的一种办法
- 大数据学习系列之八----- Hadoop、Spark、HBase、Hive搭建环境遇到的错误以及解决方法
- hadoop job解决大数据量关联时数据倾斜的一种办法
- mysql left join 右表数据不唯一的情况解决方法
- Spark数据倾斜解决原理和方法总论
- hadoop job解决大数据量关联时数据倾斜的一种办法
- hive--Hive之数据倾斜的原因和解决方法
- 大数据学习系列之八----- Hadoop、Spark、HBase、Hive搭建环境遇到的错误以及解决方法
- Hadoop文件放置策略及数据倾斜的balance方法
- MapReduce-Join中级优化-hadoop自带datajoin的解决方法
- Hive 数据倾斜原因及解决方法(转)
- 云梯表Join的倾斜问题以及解决方法
- hive数据倾斜原因和解决方法
- 136课: Spark面试经典系列之数据倾斜解决原理和方法总论.
- hadoop超时解决办法context.progress()的作用 假设在map()方法中,你有一个从数据库读取大量数据的操作,是用一个循环来完成的,并且,在读完全部的数据之前,你不会有任何的数据输出
- 大数据之Hadoop平台(四)Centos6.5(64bit)Hadoop2.5.1、Zookeeper3.4.6、Hbase0.98.6.1安装使用过程中错误及解决方法