hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹
2015-11-02 21:59
501 查看
hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹
博客分类:/article/3925863.htmlhadoop
hadoop1.2.1中使用MultipleOutputs将结果输出到多个文件或文件夹
使用步骤主要有三步:
1、在reduce或map类中创建MultipleOutputs对象,将结果输出
Java代码
class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{
//将结果输出到多个文件或多个文件夹
private MultipleOutputs<Text,IntWritable> mos;
//创建对象
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text, IntWritable>(context);
}
//关闭对象
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
2、在map或reduce方法中使用MultipleOutputs对象输出数据,代替congtext.write()
Java代码
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
IntWritable V = new IntWritable();
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
System.out.println("word:" + key.toString() + " sum = " + sum);
V.set(sum);
//使用MultipleOutputs对象输出数据
if(key.toString().equals("hello")){
mos.write("hello", key, V);
}else if(key.toString().equals("world")){
mos.write("world", key, V);
}else if(key.toString().equals("hadoop")){
//输出到hadoop/hadoopfile-r-00000文件
mos.write("hadoopfile", key, V, "hadoop/");
}
}
3、在创建job时,定义附加的输出文件,这里的文件名称与第二步设置的文件名相同
Java代码
//定义附加的输出文件
MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);
完整代码:
Java代码
package com.ru.hadoop.wordcount;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
public class WordCount2 extends Configured{
public static void main(String[] args) {
String in = "/home/nange/work/test/word/";
String out = "hdfs://localhost:9000/hdfs/test/wordcount/out/";
Job job;
try {
//删除hdfs目录
WordCount2 wc2 = new WordCount2();
wc2.removeDir(out);
job = new Job(new Configuration(), "wordcount Job");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(mapperString.class);
// job.setCombinerClass(reduceStatistics.class);
job.setReducerClass(reduceStatistics.class);
//定义附加的输出文件
MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);
FileInputFormat.addInputPath(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void removeDir(String filePath) throws IOException, URISyntaxException{
String url = "hdfs://localhost:9000";
FileSystem fs = FileSystem.get(new URI(url), new Configuration());
fs.delete(new Path(filePath));
}
}
/**
* 重写maptask使用的map方法
* @author nange
*
*/
class mapperString extends Mapper<LongWritable, Text, Text, IntWritable>{
//设置正则表达式的编译表达形式
public static Pattern PATTERN = Pattern.compile(" ");
Text K = new Text();
IntWritable V = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = PATTERN.split(value.toString());
System.out.println("********" + value.toString());
for(String word : words){
K.set(word);
context.write(K, V);
}
}
}
/**
* 对单词做统计
* @author nange
*
*/
class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{
//将结果输出到多个文件或多个文件夹
private MultipleOutputs<Text,IntWritable> mos;
//创建MultipleOutputs对象
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text, IntWritable>(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
IntWritable V = new IntWritable();
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
System.out.println("word:" + key.toString() + " sum = " + sum);
V.set(sum);
//使用MultipleOutputs对象输出数据
if(key.toString().equals("hello")){
mos.write("hello", key, V);
}else if(key.toString().equals("world")){
mos.write("world", key, V);
}else if(key.toString().equals("hadoop")){
//输出到hadoop/hadoopfile-r-00000文件
mos.write("hadoopfile", key, V, "hadoop/");
}
}
//关闭MultipleOutputs对象
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
相关文章推荐
- OpenSSL Heartbleed "心脏滴血"漏洞简单攻击示例
- CentOS下配置ssh免密码登录
- ActiveMQ源码架构解析第一节
- Java运行linux和windows系统命令
- Linux关闭防火墙
- cscope配置
- JSP 资源与网站
- Linux Bash Shell入门教程
- android架构
- Lesson 10 Other array coefficient-wise operations
- Linux有问必答:如何找出Linux中内置模块的信息
- linux的僵尸进程和孤儿进程及解决方法
- linux新装的系统下su命令不能使用的解决办法
- Linux 键盘模拟测试
- Opencv (Opencv2)结合MFC学习数字图像处理---显示图片
- vs2010 + opencv保存图像模块
- 5-26 Population (35分) -- 待解决
- Linux内存子系统基础及常用调优参数
- 【Opencv初学者笔记】-vs2010的配置和简单例程demo
- PowerShell 在hyper-v中创建虚拟机