Hadoop多路径输出(MultipleOutputs)
2015-11-16 17:51
363 查看
使用Hadoop进行数据处理时,有时候需要将计算结果根据不同的条件存入不同的分区。
比如:计算顾客是否回头购买的时候,会将回头了的顾客数据和未回头的顾客数据分别存入不同的表。
可以使用MultipleOutputs实现。
需要在reduce类中定义MultipleOutputs,并且重写Reducer的setup()方法和cleanup()方法。具体实例如下
public static class PeriodReduce extends Reducer<TextPair,Text,NullWritable,Text>{
private MultipleOutputs<NullWritable, Text> mos;
protected void setup(Context context)throws IOException,InterruptedException{
mos = new MultipleOutputs<NullWritable, Text>(context);
}
protected void cleanup(Context context)throws IOException,InterruptedException{
mos.close();
}
public void reduce(TextPair key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
Iterator<Text> it = values.iterator();
ArrayList<String> custInfo = null;
String last ="notexist";
String ret = "notexist";
while (it.hasNext()){
String line = it.next().toString();
//排序后,先到达reduce的数据是上个周期的
if(key.getId()==0){
custInfo = new ArrayList<String>();
//将上个周期的购买情况存放
custInfo.add(line);
last = "exist";
}else if(key.getId()==1){
ret = "exist";
//如果回头,则加上上个周期信息放入return目录下
if("exist".equals(last)&&custInfo.size()>0){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(line+"\001"+str),"return/r");
}
}
}
//如果没有回头,直接输出上期
if("exist".equals(last)&&"notexist".equals(ret)){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(key.getText()+"\001"+str),"loss/l");
}
}
}
}
}
比如:计算顾客是否回头购买的时候,会将回头了的顾客数据和未回头的顾客数据分别存入不同的表。
可以使用MultipleOutputs实现。
需要在reduce类中定义MultipleOutputs,并且重写Reducer的setup()方法和cleanup()方法。具体实例如下
public static class PeriodReduce extends Reducer<TextPair,Text,NullWritable,Text>{
private MultipleOutputs<NullWritable, Text> mos;
protected void setup(Context context)throws IOException,InterruptedException{
mos = new MultipleOutputs<NullWritable, Text>(context);
}
protected void cleanup(Context context)throws IOException,InterruptedException{
mos.close();
}
public void reduce(TextPair key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
Iterator<Text> it = values.iterator();
ArrayList<String> custInfo = null;
String last ="notexist";
String ret = "notexist";
while (it.hasNext()){
String line = it.next().toString();
//排序后,先到达reduce的数据是上个周期的
if(key.getId()==0){
custInfo = new ArrayList<String>();
//将上个周期的购买情况存放
custInfo.add(line);
last = "exist";
}else if(key.getId()==1){
ret = "exist";
//如果回头,则加上上个周期信息放入return目录下
if("exist".equals(last)&&custInfo.size()>0){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(line+"\001"+str),"return/r");
}
}
}
//如果没有回头,直接输出上期
if("exist".equals(last)&&"notexist".equals(ret)){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(key.getText()+"\001"+str),"loss/l");
}
}
}
}
}
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- java结合HADOOP集群文件上传下载
- 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
- Hadoop安装感悟
- hadoop安装lzo