您的位置:首页 > 运维架构

Hadoop多路径输出(MultipleOutputs)

2015-11-16 17:51 363 查看
使用Hadoop进行数据处理时,有时候需要将计算结果根据不同的条件存入不同的分区。

比如:计算顾客是否回头购买的时候,会将回头了的顾客数据和未回头的顾客数据分别存入不同的表。

可以使用MultipleOutputs实现。

需要在reduce类中定义MultipleOutputs,并且重写Reducer的setup()方法和cleanup()方法。具体实例如下

public static class PeriodReduce extends Reducer<TextPair,Text,NullWritable,Text>{
private MultipleOutputs<NullWritable, Text> mos;

protected void setup(Context context)throws IOException,InterruptedException{
mos = new MultipleOutputs<NullWritable, Text>(context);
}

protected void cleanup(Context context)throws IOException,InterruptedException{
mos.close();
}

public void reduce(TextPair key,Iterable<Text> values,Context context)throws IOException,InterruptedException{

Iterator<Text> it = values.iterator();
ArrayList<String> custInfo = null;
String last ="notexist";
String ret = "notexist";
while (it.hasNext()){
String line = it.next().toString();
//排序后,先到达reduce的数据是上个周期的
if(key.getId()==0){
custInfo = new ArrayList<String>();
//将上个周期的购买情况存放
custInfo.add(line);
last = "exist";
}else if(key.getId()==1){
ret = "exist";
//如果回头,则加上上个周期信息放入return目录下
if("exist".equals(last)&&custInfo.size()>0){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(line+"\001"+str),"return/r");
}
}
}

//如果没有回头,直接输出上期
if("exist".equals(last)&&"notexist".equals(ret)){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(key.getText()+"\001"+str),"loss/l");
}

}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息