mapreduce练习:多文件输出对象MultipleOutputs
2018-03-08 09:34
351 查看
一、需求:词频统计,根据单词的首字符的不同输出到不同的结果文件中去。输入文件内容如下:hello world
hello world
hello world
123 123 456
123 123 456
123 123
HELLO WORLD
HU HA HELLO
hello tom
jack 123 678
腾讯 百度 阿里
天猫 腾讯 百度
hello jack 456 678二、为了实现上述需求,单单使用mapreduce显然不能完成;这里就需要用到一个类:org.apache.hadoop.mapreduce.lib.output.MultipleOutputs该类中也有一个write的方法,方法定义为:mos.write(String nameOutput, K k,V v)
其中第一个参数用于指定输出文件的前缀,通过指定不同的前缀就可将不同的结果保存到不同的结果文件中。第二三个参数分别是要输出的key和value。
三、实现代码:
/**
* 多文件输出对象MultipleOutputs
*
*/
public class MultiOutputDemo implements Tool{
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
Text k = new Text();
Text v =new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//截取字符串
String lines = value.toString();
String[] fields = lines.split(" ");
for(String Word : fields){
k.set(Word);
v.set("1");
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer<Text,Text,Text,Text>{
private MultipleOutputs<Text,Text> mos = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//创建多文件输出对象
mos = new MultipleOutputs<Text, Text>(context);
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
//词频统计
int counter = 0;
for(Text w : value){
counter += Integer.parseInt(w.toString());
}
//根据首字符判断输出
if(key != null && key.toString().matches("^[a-z].*$")){
mos.write("littlecase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[A-Z].*$")){
mos.write("uppercase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[0-9].*$")){
mos.write("numbers", key,new Text(counter+""));
}else{
mos.write("others", key,new Text(counter+""));
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
//关闭多文件输出对象,刷新缓存数据
mos.close();
}
}
public void setConf(Configuration conf) {
//对conf的属性设置
//hadoop集群为高可用模式时的配置
conf.set("fs.defaultFS", "hdfs://mycluster");
conf.set("dfs.nameservices", "mycluster");
conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.mycluster.nn1", "hadoop01:9000");
conf.set("dfs.namenode.rpc-address.mycluster.nn2", "hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
}
public Configuration getConf() {
return new Configuration();
}
public int run(String[] args) throws Exception {
//创建配置对象
Configuration conf = getConf();
FileSystem fs = FSUtil.getFS();
//创建作业对象
Job job = new Job(conf,"multi output");
//设置运行主类
job.setJarByClass(MultiOutputDemo.class);
//设置mapper参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置输入参数
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置多文件输出相关配置
MultipleOutputs.addNamedOutput(job,"littlecase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"uppercase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"numbers",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"others",TextOutputFormat.class,Text.class,Text.class);
//设置reducer
job.setReducerClass(MyReducer.class);
//设置输出参数
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作业
int res = job.waitForCompletion(true) ? 0:1;
return res;
}
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(),new MultiOutputDemo(),args);
System.exit(res);
}
}
hello world
hello world
123 123 456
123 123 456
123 123
HELLO WORLD
HU HA HELLO
hello tom
jack 123 678
腾讯 百度 阿里
天猫 腾讯 百度
hello jack 456 678二、为了实现上述需求,单单使用mapreduce显然不能完成;这里就需要用到一个类:org.apache.hadoop.mapreduce.lib.output.MultipleOutputs该类中也有一个write的方法,方法定义为:mos.write(String nameOutput, K k,V v)
其中第一个参数用于指定输出文件的前缀,通过指定不同的前缀就可将不同的结果保存到不同的结果文件中。第二三个参数分别是要输出的key和value。
三、实现代码:
/**
* 多文件输出对象MultipleOutputs
*
*/
public class MultiOutputDemo implements Tool{
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
Text k = new Text();
Text v =new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//截取字符串
String lines = value.toString();
String[] fields = lines.split(" ");
for(String Word : fields){
k.set(Word);
v.set("1");
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer<Text,Text,Text,Text>{
private MultipleOutputs<Text,Text> mos = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//创建多文件输出对象
mos = new MultipleOutputs<Text, Text>(context);
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
//词频统计
int counter = 0;
for(Text w : value){
counter += Integer.parseInt(w.toString());
}
//根据首字符判断输出
if(key != null && key.toString().matches("^[a-z].*$")){
mos.write("littlecase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[A-Z].*$")){
mos.write("uppercase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[0-9].*$")){
mos.write("numbers", key,new Text(counter+""));
}else{
mos.write("others", key,new Text(counter+""));
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
//关闭多文件输出对象,刷新缓存数据
mos.close();
}
}
public void setConf(Configuration conf) {
//对conf的属性设置
//hadoop集群为高可用模式时的配置
conf.set("fs.defaultFS", "hdfs://mycluster");
conf.set("dfs.nameservices", "mycluster");
conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.mycluster.nn1", "hadoop01:9000");
conf.set("dfs.namenode.rpc-address.mycluster.nn2", "hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
}
public Configuration getConf() {
return new Configuration();
}
public int run(String[] args) throws Exception {
//创建配置对象
Configuration conf = getConf();
FileSystem fs = FSUtil.getFS();
//创建作业对象
Job job = new Job(conf,"multi output");
//设置运行主类
job.setJarByClass(MultiOutputDemo.class);
//设置mapper参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置输入参数
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置多文件输出相关配置
MultipleOutputs.addNamedOutput(job,"littlecase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"uppercase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"numbers",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"others",TextOutputFormat.class,Text.class,Text.class);
//设置reducer
job.setReducerClass(MyReducer.class);
//设置输出参数
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作业
int res = job.waitForCompletion(true) ? 0:1;
return res;
}
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(),new MultiOutputDemo(),args);
System.exit(res);
}
}
相关文章推荐
- MapReduce处理输出多文件格式(MultipleOutputs)
- 在MapReduce中利用MultipleOutputs输出多个文件
- MapReduce处理输出多文件格式(MultipleOutputs)
- MapReduce 如何输出多个文件:MultipleOutputs 运用可行
- 输出文件中不包含指定的多个排除词的行内容(双重遍历) 分类: python 小练习 2013-11-22 15:20 605人阅读 评论(0) 收藏
- MapReduce中<k3,v3>输出到文件中的分隔符究竟是什么
- C程序实现快速从文件输入和输出到文件(ACM练习好帮手)
- Hadoop学习笔记———《MultipleOutputs———将结果输出到指定的多个文件或文件夹》
- 《C++ Primer Plus(第六版)》(41)(第十七章 输入、输出和文件 编程练习和答案1)
- 文件输出流FileOutputStream练习
- mapreduce读取文件内容输出到hbase
- Hadoop MultipleOutputs 结果输出到多个文件夹 出现数据不全,部分文件为空
- mapreduce的reduce输出文件进行压缩
- MapReduce多文件输出
- MultipleOutputs实战:结果输出到多个文件夹或者文件中
- Hadoop的MapReduce中多文件输出
- 安卓小日记练习(9)序列化对象List,通过文件读取在活动间传递数据
- android 对象输出到文件
- 2013年3月算法练习(五) 文件输入、输出 .
- C Primer Plus (第五版) 第十三章 文件输入/输出 编程练习