Hadoop入门之几个Demo的加强版本
2017-08-25 19:19
295 查看
昨天写了三个MR的代码Demo,今天再对其加强一下:
1.在使用序列化统计流程的基础上,再次进行MR操作
2.对文件进行字符统计操作过程设置多个自定义过程(Shuffle过程中的combiner合并操作,分片过程中合并小文件到一个分片中)
1.在使用序列化统计流程的基础上,再次进行MR操作
package com.demo.flowsumsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * @Description: 根据总流量进行排序 * @author: songqinghu * @date: 2017年8月24日 下午8:04:21 * Version:1.0 */ public class FlowSumSort { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); job.setMapOutputKeyClass(Flow.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); //文件路径设置 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.out.println("the job exe is :" + flag); } } class FlowSumMapper extends Mapper<LongWritable, Text, Flow, Text>{ private static Flow flow = new Flow(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 123 uf df sf String line = value.toString(); String[] words = line.split("\t"); flow.setFlow(new Integer(words[1]), new Integer(words[2])); context.write(flow,new Text(words[0])); } } class FlowSumReducer extends Reducer<Flow, Text, Text, Flow>{ @Override protected void reduce(Flow flow, Iterable<Text> iters, Context context) throws IOException, InterruptedException { context.write(iters.iterator().next(), flow); } } //hadoop框架序列化接口 class Flow implements WritableComparable<Flow>{ private int uFlow;//上传 private int dFlow;//下载 private int sFlow;//总 public Flow() {} //反射 public Flow(int uFlow, int dFlow) { super(); this.uFlow = uFlow; this.dFlow = dFlow; this.sFlow = uFlow + dFlow; } public void setFlow(int uFlow, int dFlow) { this.uFlow = uFlow; this.dFlow = dFlow; this.sFlow = uFlow + dFlow; } public int getuFlow() { return uFlow; } public void setuFlow(int uFlow) { this.uFlow = uFlow; } public int getdFlow() { return dFlow; } public void setdFlow(int dFlow) { this.dFlow = dFlow; } public int getsFlow() { return sFlow; } public void setsFlow(int sFlow) { this.sFlow = sFlow; } @Override public void write(DataOutput out) throws IOException { //序列化 out.writeInt(uFlow); out.writeInt(dFlow); out.writeInt(sFlow); } @Override public void readFields(DataInput in) throws IOException { //反序列化 this.uFlow = in.readInt(); this.dFlow = in.readInt(); this.sFlow = in.readInt(); } @Override //最终输出时的打印方式 public String toString() { return uFlow +"\t" + dFlow +"\t" + sFlow; } @Override public int compareTo(Flow flow) { return this.sFlow > flow.sFlow ? -1:1; } }
2.对文件进行字符统计操作过程设置多个自定义过程(Shuffle过程中的combiner合并操作,分片过程中合并小文件到一个分片中)
package com.demo.wordcountfile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @Description: 单词统计MR过程 shuffle 过程合并 小文件 合并设置 * @author: songqinghu * @date: 2017年8月24日 下午6:12:24 * Version:1.0 */ public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //在hadoop集群机器上运行会读取相应的配置文件 $Hadoop_Home Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class);//上传Jar //map & reduce 过程设置 job.setMapperClass(WordCountMap.class);//mapper 过程 job.setReducerClass(WordCountReduce.class);//reduce 过程 job.setCombinerClass(WordCountCombiner.class); job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMinInputSplitSize(job, 1048576);//1m CombineTextInputFormat.setMaxInputSplitSize(job, 10485760); job.setMapOutputKeyClass(Text.class);//mapper 输出key job.setMapOutputValueClass(IntWritable.class);//mapper 输出value job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //文件路径设置 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.out.println("the job exe is :" + flag); } } class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } } class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text text, Iterable<IntWritable> iter, Context context) throws IOException, InterruptedException { //初始化一个计数器用于叠加总次数 int count = 0; for (IntWritable num : iter) { count = count + num.get(); } //汇总结束,写出 context.write(text, new IntWritable(count)); } } //combiner 就是 继承reducer的合并过程 class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text text, Iterable<IntWritable> iter, Context context) throws IOException, InterruptedException { //初始化一个计数器用于叠加总次数 int count = 0; for (IntWritable num : iter) { count = count + num.get(); } //汇总结束,写出 context.write(text, new IntWritable(count)); } }
相关文章推荐
- Hadoop入门之Mapreduce过程的几个Demo
- hadoop几个版本区别
- Hadoop MapReduce编程 API入门系列之wordcount版本2(六)
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)
- 【新手入门】Hadoop 单机版本重启后hadoop无法启动name node
- 给大家介绍几个Hadoop入门经典教程
- hadoop几个版本区别
- hadoop几个版本区别
- Hadoop MapReduce编程 API入门系列之wordcount版本3(七)
- Hadoop HDFS编程 API入门系列之RPC版本1(八)
- hadoop几个版本区别
- Hadoop MapReduce编程 API入门系列之wordcount版本4(八)
- hadoop入门搭windows eclipse环境时遇到的几个问题以及解决方案
- hadoop几个版本区别
- mapreduce入门--参考雅虎hadoop入门,根据0.20版本修改部分代码
- Hadoop HDFS编程 API入门系列之RPC版本2(九)
- Hadoop入门(八)自定义类型实例-统计手机流量数据Demo
- hadoop几个版本区别
- Hadoop入门之Join的两种实现Demo
- hadoop几个版本区别