(13) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版 2
2018-03-17 11:53
781 查看
添加了:setup方法 和 cleanup 方法 setup是在reduce之前做一些动作 cleanup 是在reduce之后做一些动作
添加了shuffle内容介绍package com.my.hadoop.hadoophdfs.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 单词统计类
* @author liming
*
*/
public class ModuleMapReduce extends Configured implements Tool {
/**
* TODO Map 开发时修改四个参数
* @author liming
*
*/
public static class ModuleMapper extends Mapper<LongWritable ,Text, Text, IntWritable>{
//1
public void setup(Context context) throws IOException,
InterruptedException {
//Nothing
}
//2
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//TODO 实现业务逻辑
}
//3
public void cleanup(Context context) throws IOException,
InterruptedException {
//Nothing
}
}
/**
* TODO Reduce 开发时修改四个参数
* @author liming
*
*/
public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//1
protected void setup(Context context)
throws IOException, InterruptedException {
//Nothing
}
//2
protected void reduce(Text key, Iterable<IntWritable> values,
Context content)
throws IOException, InterruptedException {
//TODO 业务逻辑
}
//3
protected void cleanup(Context context)
throws IOException, InterruptedException {
//Nothing
}
}
/**
* Driver
*/
// run 是 Tool中的方法
public int run(String[] args) throws Exception {
// 获取configuration 从继承的Configured类中获取
Configuration cf = getConf();
// 创建job
try {
// 配置文件 job名称
Job job = Job.getInstance(cf, this.getClass().getSimpleName());
// 设置运行类的类型
job.setJarByClass(this.getClass());
/**** input ******/
// input map reduce output 串起来
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// TextInputFormat
/**** map ******/
// map 方法类型嗯
job.setMapperClass(ModuleMapper.class);
// map 输出key value 类型
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/**************************shuffle*******************************************************/
//总:
//1.分区 partitioner
//2.排序 sort 根据key
//3.复制 copy 用户无法干涉
//4.分组 group 也是根据key
//5.压缩 compress -- 可设置
//6.合并 combiner <a,1> <a,1> ---> <a,2> 不是所有程序都可做 -- 可设置
//shuffle 是贯穿 map阶段 和 reduce阶段 它是在map结尾+reduce头
//partition
//job.setPartitionerClass(cls);
//sort 排序
// job.setCombinerClass(cls);
//optional 可选 combiner
// job.setCombinerClass(cls);
//group 分组
// job.setGroupingComparatorClass(cls);
//压缩 可以通过mapreduce配置文件进行配置 也可以通过configuration设置 看main方法
/**************************shuffle*******************************************************/
/**** reduce ******/
// reduce类型
job.setReducerClass(ModuleReducer.class);
// TODO reduce 输出 也就是job输出的类型 开发时需要修改
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//调优
//设置 reduce个数 默认是1 可以在配置文件中设置mapreduce.job.reduces
// job.setNumReduceTasks(2);
/**** output ******/
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// TextOutputFormat 每个<key,value> 对,输出一行 key与value中间分隔符为\t 默认调用key和value的toString() 方法
/**** 提交job ******/
// 返回布尔类型 这里设置true是打印日志信息 设置false是不打印日志
boolean isSucc = job.waitForCompletion(true);
return isSucc ? 0 : 1;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
public static void main(String[] args) throws Exception {
//运行
Configuration conf = new Configuration();
//压缩 这里可以设置多个属性 覆盖默认属性
conf.set("mapreduce.map.output.compress", "true");
//压缩格式
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
//在这个里边 设置了传递的参数conf 然后在run方法中获取 都是父类的方法
int status= ToolRunner.run(conf, new WordCountMapReduce(), args);
System.exit(status);
//
}
}
添加了shuffle内容介绍package com.my.hadoop.hadoophdfs.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 单词统计类
* @author liming
*
*/
public class ModuleMapReduce extends Configured implements Tool {
/**
* TODO Map 开发时修改四个参数
* @author liming
*
*/
public static class ModuleMapper extends Mapper<LongWritable ,Text, Text, IntWritable>{
//1
public void setup(Context context) throws IOException,
InterruptedException {
//Nothing
}
//2
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//TODO 实现业务逻辑
}
//3
public void cleanup(Context context) throws IOException,
InterruptedException {
//Nothing
}
}
/**
* TODO Reduce 开发时修改四个参数
* @author liming
*
*/
public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//1
protected void setup(Context context)
throws IOException, InterruptedException {
//Nothing
}
//2
protected void reduce(Text key, Iterable<IntWritable> values,
Context content)
throws IOException, InterruptedException {
//TODO 业务逻辑
}
//3
protected void cleanup(Context context)
throws IOException, InterruptedException {
//Nothing
}
}
/**
* Driver
*/
// run 是 Tool中的方法
public int run(String[] args) throws Exception {
// 获取configuration 从继承的Configured类中获取
Configuration cf = getConf();
// 创建job
try {
// 配置文件 job名称
Job job = Job.getInstance(cf, this.getClass().getSimpleName());
// 设置运行类的类型
job.setJarByClass(this.getClass());
/**** input ******/
// input map reduce output 串起来
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// TextInputFormat
/**** map ******/
// map 方法类型嗯
job.setMapperClass(ModuleMapper.class);
// map 输出key value 类型
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/**************************shuffle*******************************************************/
//总:
//1.分区 partitioner
//2.排序 sort 根据key
//3.复制 copy 用户无法干涉
//4.分组 group 也是根据key
//5.压缩 compress -- 可设置
//6.合并 combiner <a,1> <a,1> ---> <a,2> 不是所有程序都可做 -- 可设置
//shuffle 是贯穿 map阶段 和 reduce阶段 它是在map结尾+reduce头
//partition
//job.setPartitionerClass(cls);
//sort 排序
// job.setCombinerClass(cls);
//optional 可选 combiner
// job.setCombinerClass(cls);
//group 分组
// job.setGroupingComparatorClass(cls);
//压缩 可以通过mapreduce配置文件进行配置 也可以通过configuration设置 看main方法
/**************************shuffle*******************************************************/
/**** reduce ******/
// reduce类型
job.setReducerClass(ModuleReducer.class);
// TODO reduce 输出 也就是job输出的类型 开发时需要修改
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//调优
//设置 reduce个数 默认是1 可以在配置文件中设置mapreduce.job.reduces
// job.setNumReduceTasks(2);
/**** output ******/
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// TextOutputFormat 每个<key,value> 对,输出一行 key与value中间分隔符为\t 默认调用key和value的toString() 方法
/**** 提交job ******/
// 返回布尔类型 这里设置true是打印日志信息 设置false是不打印日志
boolean isSucc = job.waitForCompletion(true);
return isSucc ? 0 : 1;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
public static void main(String[] args) throws Exception {
//运行
Configuration conf = new Configuration();
//压缩 这里可以设置多个属性 覆盖默认属性
conf.set("mapreduce.map.output.compress", "true");
//压缩格式
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
//在这个里边 设置了传递的参数conf 然后在run方法中获取 都是父类的方法
int status= ToolRunner.run(conf, new WordCountMapReduce(), args);
System.exit(status);
//
}
}
相关文章推荐
- (12) Hadoop Java 实现MapReduce HelloWord 单词统计 更新版
- (11) Hadoop Java 实现MapReduce HelloWord 单词统计
- HADOOP(1)__Mapreduce_WordCount统计单词数
- hadoop基础----hadoop实战(三)-----hadoop运行MapReduce---对单词进行统计--经典的自带例子wordcount
- hadoop入门(六)JavaAPI+Mapreduce实例wordCount单词计数详解
- hadoop基础----hadoop实战(三)-----hadoop运行MapReduce---对单词进行统计--经典的自带例子wordcount
- 和我一起学Hadoop(五):MapReduce的单词统计,wordcount
- Hadoop示例程序之单词统计MapReduce
- java实现读取一篇英文文章,统计其中每个单词出现的次数并排序输出
- hadoop的统计单词程序WordCount
- Hadoop MapReduce示例程序WordCount.java手动编译运行解析
- Hadoop开篇之Mapreduce实现多类别流量统计的两种实现方式
- hadoop 1.2.1 Eclipse mapreduce hello word 学习笔记
- Java实现统计一篇文章中每个单词出现的次数
- Hadoop之——以1.x版本和0.x版本分别实现单词统计功能
- Python+Hadoop Streaming实现MapReduce(word count)
- hadoop实例分析之WordCount单词统计分析
- java实现文件单词频率统计
- Hadoop示例程序之单词统计MapReduce