MapReduce处理输出多文件格式(MultipleOutputs)
2017-08-15 20:49
429 查看
MultiPleOutputs原理
MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置。在输出目录中,框架自己会自动对输出文件进行命名和组织,如part-(m|r)-00000之类,但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织。以前常用的方法是在MR
job运行之后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式。
研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
在一般情况下,Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,如果需要认为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,MultipleOutputs采用输出记录的键值对(output
Key和output Value)或者任意字符串来生成输出文件的名字,文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。
使用方法
1.驱动中不需要额外改变,只需要在MapClass或Reduce类中加入以下代码
[java] view
plain copy
print?
private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws Exception{
mos=new MultipleOutputs(context);
}
public void cleanup(Context context) throws Exeception{
mos.close();
}
2.然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key,value);
在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0。而且只有part-m-00*会传给reduce。
注意:
multipleOutputs.write(key,value,baseOutputPath)方法的第三个参数表明了该输出所在的目录(相当于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn).
如果包含文件分隔符"/".例如baseOutputPath="029070-99999/1901/part",那么输出文件则为027070-99999/1901/part-r-nnnnn.
3.最后在job的类中
// 设置输出文件类型
[java] view
plain copy
print?
MultipleOutputs.addNamedOutput(job, namedOutput, outputFormatClass, keyClass, valueClass);
案例
原始数据
1512,iphone5s,4inchs,A7,64,M7,lowerpower
1512,iphone5,4inchs,A6,IOS7
1512,iphone4s,3.5inchs,A5
50019780,Ipad,9.7,retina
50019780,yoga,lenovel,18hours
50019780,nexus,7,google
50019780,Ipad mini2,retina,7.9
1101,macbook air,OS X mavericks
1101,macbook pro,OS X lion
1101,thinkpad yoga,lenovel,windows 8
对原始数据根据第一个id列进行分类到不同的文件中
1.Map类
[java] view
plain copy
print?
package test.mr.multioutputs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MultioutMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
if (line.length() > 0) {
String str[] = line.split(",");
// 提取分类
context.write(new Text(str[0]), value);
}
}
}
2.Reduce类
[java] view
plain copy
print?
package test.mr.multioutputs;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultioutRedu extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> mos; // 输出类型和Reduce一致
@Override
protected void setup(Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
mos = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void cleanup(
Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
mos.close();
}
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
/*
* mos.write(Key key,Value
* value,StringbaseOutputPath)代替context.write(key,value);
*/
for (Text value : values) {
// 指定写出不同文件的数据
mos.write("KeySplit", NullWritable.get(), value, key.toString()
+ "/");
// 指定写出全部数据
mos.write("AllData", NullWritable.get(), value);
}
}
}
3.job类
[java] view
plain copy
print?
package test.mr.multioutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MultioutMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultioutMain.class);
job.setMapperClass(MultioutMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MultioutRedu.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 设置输出文件类型
MultipleOutputs.addNamedOutput(job, "KeySplit", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "AllData", TextOutputFormat.class,
NullWritable.class, Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置。在输出目录中,框架自己会自动对输出文件进行命名和组织,如part-(m|r)-00000之类,但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织。以前常用的方法是在MR
job运行之后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式。
研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
在一般情况下,Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,如果需要认为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,MultipleOutputs采用输出记录的键值对(output
Key和output Value)或者任意字符串来生成输出文件的名字,文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。
使用方法
1.驱动中不需要额外改变,只需要在MapClass或Reduce类中加入以下代码
[java] view
plain copy
print?
private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws Exception{
mos=new MultipleOutputs(context);
}
public void cleanup(Context context) throws Exeception{
mos.close();
}
2.然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key,value);
在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0。而且只有part-m-00*会传给reduce。
注意:
multipleOutputs.write(key,value,baseOutputPath)方法的第三个参数表明了该输出所在的目录(相当于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn).
如果包含文件分隔符"/".例如baseOutputPath="029070-99999/1901/part",那么输出文件则为027070-99999/1901/part-r-nnnnn.
3.最后在job的类中
// 设置输出文件类型
[java] view
plain copy
print?
MultipleOutputs.addNamedOutput(job, namedOutput, outputFormatClass, keyClass, valueClass);
案例
原始数据
1512,iphone5s,4inchs,A7,64,M7,lowerpower
1512,iphone5,4inchs,A6,IOS7
1512,iphone4s,3.5inchs,A5
50019780,Ipad,9.7,retina
50019780,yoga,lenovel,18hours
50019780,nexus,7,google
50019780,Ipad mini2,retina,7.9
1101,macbook air,OS X mavericks
1101,macbook pro,OS X lion
1101,thinkpad yoga,lenovel,windows 8
对原始数据根据第一个id列进行分类到不同的文件中
1.Map类
[java] view
plain copy
print?
package test.mr.multioutputs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MultioutMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
if (line.length() > 0) {
String str[] = line.split(",");
// 提取分类
context.write(new Text(str[0]), value);
}
}
}
2.Reduce类
[java] view
plain copy
print?
package test.mr.multioutputs;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultioutRedu extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> mos; // 输出类型和Reduce一致
@Override
protected void setup(Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
mos = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void cleanup(
Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
mos.close();
}
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
/*
* mos.write(Key key,Value
* value,StringbaseOutputPath)代替context.write(key,value);
*/
for (Text value : values) {
// 指定写出不同文件的数据
mos.write("KeySplit", NullWritable.get(), value, key.toString()
+ "/");
// 指定写出全部数据
mos.write("AllData", NullWritable.get(), value);
}
}
}
3.job类
[java] view
plain copy
print?
package test.mr.multioutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MultioutMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultioutMain.class);
job.setMapperClass(MultioutMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MultioutRedu.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 设置输出文件类型
MultipleOutputs.addNamedOutput(job, "KeySplit", TextOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "AllData", TextOutputFormat.class,
NullWritable.class, Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
相关文章推荐
- MapReduce处理输出多文件格式(MultipleOutputs)
- 案例十三:多格式文件输出MultipleOutputs
- MapReduce处理CSV格式文件的的一个实例
- 浏览器处理由带BOM的utf-8格式的php文件输出的HTML问题
- MapReduce输出压缩格式文件
- Mapreduce 处理gbk文件的方式(输入gbk文件和输出gbk文件)
- cmd chcp命令切换字符格式 cmd重定向输出到文件出现中文乱码的处理方法
- 深度【文本分类】【关系抽取】模型中,如何读取并处理输出的训练文件(TXT格式)
- MapReduce 如何输出多个文件:MultipleOutputs 运用可行
- 【Hadoop】利用MultipleOutputs,MultiOutputFormat实现以不同格式输出到多个文件
- 控制MapReduce输出文件个数及格式
- 在MapReduce中利用MultipleOutputs输出多个文件
- mapreduce练习:多文件输出对象MultipleOutputs
- Hadoop的MultipleOutputs进行多文件输出
- 文章20:NGINX配置文件格式及处理流程
- 读取txt格式文件输出(点击下载txt模板)
- hadoop自定义文件输出格式
- Java读取txt或其他文件以UTF-8格式输出的时候,第一行代码出现“?”乱码的原因及解决方案!
- fastq文件格式处理工具系列学习
- sqlplus输出HTML格式文件