您的位置:首页 > 运维架构

hadoop join数据倾斜解决方法

2016-12-01 11:08 393 查看
注意点:

1: map输出的一定是两表的外键

2:构造的信息bean要有一个标志位,用来判别现在的bean中的信息是属于哪个表的

下面是实现代码已运行通过

package join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class InfoBean implements Writable {

public InfoBean() {
}

public void set(String pid, String data, String price, String amount, String productName, String orderId, String categoryId,String flag) {
this.pid = pid;
this.data = data;
this.price = price;
this.amount = amount;
this.productName = productName;
this.orderId = orderId;
this.categoryId = categoryId;
this.flag = flag;
}

private String pid;
private String data;
private String price;
private String amount;
private String productName;
private String orderId;
private String categoryId;

//0 order 1 product
private String flag;

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(pid);
out.writeUTF(data);
out.writeUTF(price);
out.writeUTF(amount);
out.writeUTF(productName);
out.writeUTF(orderId);
out.writeUTF(categoryId);
out.writeUTF(flag);

}

@Override
public void readFields(DataInput input) throws IOException {
this.pid = input.readUTF();
this.data = input.readUTF();
this.price = input.readUTF();
this.amount = input.readUTF();
this.productName = input.readUTF();
this.orderId = input.readUTF();
this.categoryId = input.readUTF();
this.flag = input.readUTF();
}

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public String getData() {
return data;
}

public void setData(String data) {
this.data = data;
}

public String getPrice() {
return price;
}

public void setPrice(String price) {
this.price = price;
}

public String getAmount() {
return amount;
}

public void setAmount(String amount) {
this.amount = amount;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getCategoryId() {
return categoryId;
}

public void setCategoryId(String categoryId) {
this.categoryId = categoryId;
}

public String getFlag() {
return flag;
}

public void setFlag(String flag) {
this.flag = flag;
}

@Override
public String toString() {
return "InfoBean{" +
"pid='" + pid + '\'' +
", data='" + data + '\'' +
", price='" + price + '\'' +
", amount='" + amount + '\'' +
", productName='" + productName + '\'' +
", orderId='" + orderId + '\'' +
", categoryId='" + categoryId + '\'' +
", flag='" + flag + '\'' +
'}';
}
}


这个bean包含了我们要输出的全部信息外加了一个标志位用来说明

mr程序

package join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DataJoin {

//map输出时key为联系表的外键
static class JoinMapper extends Mapper<LongWritable,Text,Text,InfoBean> {

@Override
protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
InfoBean info = new InfoBean();
Text key = new Text();
//InfoBean为合体bean,所以先判断map进来的是哪种bean
FileSplit split  = (FileSplit) context.getInputSplit();
String filename = split.getPath().getName();
if (filename.startsWith("order")){
info.set(fileds[2],fileds[1],"",fileds[3],"",fileds[0],"","0");
}else {
info.set(fileds[0],"",fileds[3],"",fileds[1],"",fileds[2],"1");
}
String pid = info.getPid();
key.set(pid);
context.write(key,info);
}
}
static class JoinReducer extends Reducer<Text,InfoBean,InfoBean,NullWritable>{
@Override
protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException {
List<InfoBean> infoBeans = new ArrayList<>();
InfoBean productBean = new InfoBean();
for (InfoBean value:values){
String flag = value.getFlag();
if ("0".equals(flag)){
InfoBean bean = new InfoBean();
try {
BeanUtils.copyProperties(bean,value);
infoBeans.add(bean);
} catch (Exception e) {
e.printStackTrace();
}
}else {
try {
BeanUtils.copyProperties(productBean,value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
infoBeans.forEach(
infoBean -> {
infoBean.setProductName(productBean.getProductName());
infoBean.setCategoryId(productBean.getCategoryId());
infoBean.setPrice(productBean.getPrice());
try {
context.write(infoBean,NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
);
}
}

public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();

//本地调试模式
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");

Job job = Job.getInstance();

//指定本程序的jar包所在的本地路径
job.setJarByClass(DataJoin.class);

//指定运行的map程序
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);

//指定map输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);

//最终输出数据类型kv
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);

//指定job的输入原始文件所在目录
//        FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

Boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);
}

}


在上诉的例子中:

当一个商品卖的特别火的时候,他的order表将很多项,这时候通过pid的hashcode决定reducer的时候,就会出现数据倾斜。就是一个reducer处理很多,而其他的很少

解决方法:

使用 map side join(不需要reducer,只使用map)

将产品信息表(比order要小很多很多)放入一个distributedcache中,

然而hadoop已经为我们实现了distributedcache

所以代码改动如下:

1:新建一个product类

public class Product {
private String pid;
private String data;
private String price;


2:map端:

//map输出时key为联系表的外键
static class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

Product p = new Product();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File("product.txt"))));
String line = br.readLine();
if (StringUtils.isNotEmpty(line)){
String[] fileds = line.split(",");
p.set(fileds[0],fileds[3],fileds[2],fileds[1]);
}
br.close();
}

@Override
protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
InfoBean info = new InfoBean();
Text key = new Text();
info.set(fileds[2],fileds[1],p.getPrice(),fileds[3],p.getProductName(),fileds[0],p.getCategoryId(),"0");
String pid = info.getPid();
key.set(pid + p.getProductName() + p.getPrice());
context.write(key,NullWritable.get());
}
}


3:job端

Configuration conf = new Configuration();

//本地调试模式
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");

//        本地提交模式 hdfs在线
//        conf.set("mapreduce.framework.name","local");
//        conf.set("fs.defaultFS","hdfs://master:9000");

Job job = Job.getInstance();

//线上
job.setJarByClass(DataJoin.class);
//调试模式
//        job.setJar("/home/willian/Desktop/project/java/hadoop/out/jar/word.jar");

//指定运行的map程序
job.setMapperClass(JoinMapper.class);
//        job.setReducerClass(JoinReducer.class);

//        //指定map输出数据的kv类型
//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(InfoBean.class);

//最终输出数据类型kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);

//
//        job.addArchiveToClassPath(); //缓存jar包到task节点的classpath
//        job.addFileToClassPath(); //缓存普通文件到task节点的classpath
//        job.addCacheFile(); //缓存普通文件到task节点的工作目录中

job.addCacheFile(new URI("file:///home/willian/Desktop/project/java/hadoop/product.txt"));//或hdfs文件路径

//指定job的输入原始文件所在目录
//        FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));
FileInputFormat.setInputPaths(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/out"));

Boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop