您的位置:首页 > 编程语言 > PHP开发

mapreduce练习:多文件输出对象MultipleOutputs

2018-03-08 09:34 351 查看
一、需求:词频统计,根据单词的首字符的不同输出到不同的结果文件中去。输入文件内容如下:hello world
hello world
hello world
123 123 456
123 123 456
123 123
HELLO WORLD
HU HA HELLO
hello tom
jack 123 678
腾讯 百度 阿里
天猫 腾讯 百度
hello jack 456 678二、为了实现上述需求,单单使用mapreduce显然不能完成;这里就需要用到一个类:org.apache.hadoop.mapreduce.lib.output.MultipleOutputs该类中也有一个write的方法,方法定义为:mos.write(String nameOutput, K k,V v)
其中第一个参数用于指定输出文件的前缀,通过指定不同的前缀就可将不同的结果保存到不同的结果文件中。第二三个参数分别是要输出的key和value。

三、实现代码:

/**
* 多文件输出对象MultipleOutputs
*
*/
public class MultiOutputDemo implements Tool{

public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
Text k = new Text();
Text v =new Text();

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//截取字符串
String lines = value.toString();
String[] fields = lines.split(" ");
for(String Word : fields){
k.set(Word);
v.set("1");
context.write(k, v);
}
}
}

public static class MyReducer extends Reducer<Text,Text,Text,Text>{

private MultipleOutputs<Text,Text> mos = null;

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//创建多文件输出对象
mos = new MultipleOutputs<Text, Text>(context);
}

@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
//词频统计
int counter = 0;
for(Text w : value){
counter += Integer.parseInt(w.toString());
}
//根据首字符判断输出
if(key != null && key.toString().matches("^[a-z].*$")){
mos.write("littlecase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[A-Z].*$")){
mos.write("uppercase", key,new Text(counter+""));
}else if(key != null && key.toString().matches("^[0-9].*$")){
mos.write("numbers", key,new Text(counter+""));
}else{
mos.write("others", key,new Text(counter+""));
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
//关闭多文件输出对象,刷新缓存数据
mos.close();
}
}

public void setConf(Configuration conf) {
//对conf的属性设置
//hadoop集群为高可用模式时的配置
conf.set("fs.defaultFS", "hdfs://mycluster");
conf.set("dfs.nameservices", "mycluster");
conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.mycluster.nn1", "hadoop01:9000");
conf.set("dfs.namenode.rpc-address.mycluster.nn2", "hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
}
public Configuration getConf() {
return new Configuration();
}
public int run(String[] args) throws Exception {
//创建配置对象
Configuration conf = getConf();
FileSystem fs = FSUtil.getFS();
//创建作业对象
Job job = new Job(conf,"multi output");

//设置运行主类
job.setJarByClass(MultiOutputDemo.class);

//设置mapper参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

        //设置输入参数
    FileInputFormat.addInputPath(job,new Path(args[0]));

//设置多文件输出相关配置
MultipleOutputs.addNamedOutput(job,"littlecase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"uppercase",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"numbers",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"others",TextOutputFormat.class,Text.class,Text.class);

//设置reducer
job.setReducerClass(MyReducer.class);
        //设置输出参数
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作业
int res = job.waitForCompletion(true) ? 0:1;
return res;
}

public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(),new MultiOutputDemo(),args);
System.exit(res);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: