您的位置:首页 > 编程语言 > Java开发

(13) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版 2

2018-03-17 11:53 781 查看
添加了:setup方法  和 cleanup 方法   setup是在reduce之前做一些动作  cleanup 是在reduce之后做一些动作
添加了shuffle内容介绍package com.my.hadoop.hadoophdfs.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 单词统计类
* @author liming
*
*/
public class ModuleMapReduce extends Configured implements Tool {
/**
* TODO Map 开发时修改四个参数
* @author liming
*
*/
public static class ModuleMapper extends Mapper<LongWritable ,Text, Text, IntWritable>{

//1
public void setup(Context context) throws IOException,
InterruptedException {
//Nothing
}
//2
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//TODO 实现业务逻辑
}
//3
public void cleanup(Context context) throws IOException,
InterruptedException {
//Nothing
}

}
/**
* TODO Reduce 开发时修改四个参数
* @author liming
*
*/
public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

//1
protected void setup(Context context)
throws IOException, InterruptedException {
//Nothing
}
//2
protected void reduce(Text key, Iterable<IntWritable> values,
Context content)
throws IOException, InterruptedException {
//TODO 业务逻辑
}
//3
protected void cleanup(Context context)
throws IOException, InterruptedException {
//Nothing
}
}

/**
* Driver
*/
// run 是 Tool中的方法
public int run(String[] args) throws Exception {
// 获取configuration 从继承的Configured类中获取
Configuration cf = getConf();
// 创建job
try {
// 配置文件 job名称
Job job = Job.getInstance(cf, this.getClass().getSimpleName());
// 设置运行类的类型
job.setJarByClass(this.getClass());

/**** input ******/
// input map reduce output 串起来
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// TextInputFormat

/**** map ******/
// map 方法类型嗯
job.setMapperClass(ModuleMapper.class);
// map 输出key value 类型
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/**************************shuffle*******************************************************/
//总:
//1.分区 partitioner
//2.排序 sort 根据key
//3.复制 copy 用户无法干涉
//4.分组 group 也是根据key
//5.压缩 compress -- 可设置
//6.合并 combiner <a,1> <a,1> ---> <a,2> 不是所有程序都可做 -- 可设置

//shuffle 是贯穿 map阶段 和 reduce阶段 它是在map结尾+reduce头
//partition
//job.setPartitionerClass(cls);

//sort 排序
// job.setCombinerClass(cls);

//optional 可选 combiner
// job.setCombinerClass(cls);

//group 分组
// job.setGroupingComparatorClass(cls);

//压缩 可以通过mapreduce配置文件进行配置 也可以通过configuration设置 看main方法

/**************************shuffle*******************************************************/

/**** reduce ******/
// reduce类型
job.setReducerClass(ModuleReducer.class);

// TODO reduce 输出 也就是job输出的类型 开发时需要修改
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//调优
//设置 reduce个数 默认是1 可以在配置文件中设置mapreduce.job.reduces
// job.setNumReduceTasks(2);

/**** output ******/
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// TextOutputFormat 每个<key,value> 对,输出一行 key与value中间分隔符为\t 默认调用key和value的toString() 方法
/**** 提交job ******/
// 返回布尔类型 这里设置true是打印日志信息 设置false是不打印日志
boolean isSucc = job.waitForCompletion(true);

return isSucc ? 0 : 1;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}

public static void main(String[] args) throws Exception {
//运行
Configuration conf = new Configuration();

//压缩 这里可以设置多个属性 覆盖默认属性
conf.set("mapreduce.map.output.compress", "true");
//压缩格式
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");

//在这个里边 设置了传递的参数conf 然后在run方法中获取 都是父类的方法
int status= ToolRunner.run(conf, new WordCountMapReduce(), args);
System.exit(status);

//
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: