您的位置:首页 > 运维架构

hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹

2015-11-02 21:59 501 查看

hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹

博客分类:/article/3925863.html

hadoop

hadoop1.2.1中使用MultipleOutputs将结果输出到多个文件或文件夹

使用步骤主要有三步:

1、在reduce或map类中创建MultipleOutputs对象,将结果输出

Java代码


class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

//将结果输出到多个文件或多个文件夹

private MultipleOutputs<Text,IntWritable> mos;

//创建对象

protected void setup(Context context) throws IOException,InterruptedException {

mos = new MultipleOutputs<Text, IntWritable>(context);

}

//关闭对象

protected void cleanup(Context context) throws IOException,InterruptedException {

mos.close();

}

}

2、在map或reduce方法中使用MultipleOutputs对象输出数据,代替congtext.write()

Java代码


protected void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

IntWritable V = new IntWritable();

int sum = 0;

for(IntWritable value : values){

sum = sum + value.get();

}

System.out.println("word:" + key.toString() + " sum = " + sum);

V.set(sum);

//使用MultipleOutputs对象输出数据

if(key.toString().equals("hello")){

mos.write("hello", key, V);

}else if(key.toString().equals("world")){

mos.write("world", key, V);

}else if(key.toString().equals("hadoop")){

//输出到hadoop/hadoopfile-r-00000文件

mos.write("hadoopfile", key, V, "hadoop/");

}

}

3、在创建job时,定义附加的输出文件,这里的文件名称与第二步设置的文件名相同

Java代码


//定义附加的输出文件

MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);

MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);

MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);

完整代码:

Java代码


package com.ru.hadoop.wordcount;

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RecordWriter;

import org.apache.hadoop.mapred.lib.MultipleOutputFormat;

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Progressable;

public class WordCount2 extends Configured{

public static void main(String[] args) {

String in = "/home/nange/work/test/word/";

String out = "hdfs://localhost:9000/hdfs/test/wordcount/out/";

Job job;

try {

//删除hdfs目录

WordCount2 wc2 = new WordCount2();

wc2.removeDir(out);

job = new Job(new Configuration(), "wordcount Job");

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(mapperString.class);

// job.setCombinerClass(reduceStatistics.class);

job.setReducerClass(reduceStatistics.class);

//定义附加的输出文件

MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);

MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);

MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);

FileInputFormat.addInputPath(job, new Path(in));

FileOutputFormat.setOutputPath(job, new Path(out));

job.waitForCompletion(true);

} catch (IOException e) {

e.printStackTrace();

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (ClassNotFoundException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public void removeDir(String filePath) throws IOException, URISyntaxException{

String url = "hdfs://localhost:9000";

FileSystem fs = FileSystem.get(new URI(url), new Configuration());

fs.delete(new Path(filePath));

}

}

/**

* 重写maptask使用的map方法

* @author nange

*

*/

class mapperString extends Mapper<LongWritable, Text, Text, IntWritable>{

//设置正则表达式的编译表达形式

public static Pattern PATTERN = Pattern.compile(" ");

Text K = new Text();

IntWritable V = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String[] words = PATTERN.split(value.toString());

System.out.println("********" + value.toString());

for(String word : words){

K.set(word);

context.write(K, V);

}

}

}

/**

* 对单词做统计

* @author nange

*

*/

class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

//将结果输出到多个文件或多个文件夹

private MultipleOutputs<Text,IntWritable> mos;

//创建MultipleOutputs对象

protected void setup(Context context) throws IOException,InterruptedException {

mos = new MultipleOutputs<Text, IntWritable>(context);

}

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

IntWritable V = new IntWritable();

int sum = 0;

for(IntWritable value : values){

sum = sum + value.get();

}

System.out.println("word:" + key.toString() + " sum = " + sum);

V.set(sum);

//使用MultipleOutputs对象输出数据

if(key.toString().equals("hello")){

mos.write("hello", key, V);

}else if(key.toString().equals("world")){

mos.write("world", key, V);

}else if(key.toString().equals("hadoop")){

//输出到hadoop/hadoopfile-r-00000文件

mos.write("hadoopfile", key, V, "hadoop/");

}

}

//关闭MultipleOutputs对象

protected void cleanup(Context context) throws IOException,InterruptedException {

mos.close();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: