MapReduce学习小结(二)
2017-01-30 10:28
260 查看
昨天时候重新敲了一遍wordcount.敲代码挺好的~早上醒来,继续开始复习.mapreduce很好用.早上练习了一个简单的 数据去重,一个求平均分,一个排序并且加数字编号.下面是代码.其实都相似~多敲几次就渐渐体会到了.
package demos; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /*数字排序并加序号 源数据: 2 32 654 32 15 756 65223 5956 22 650 92 26 54 6*/ public class AddNums { public static void main(String[] args) throws Exception{ if(args.length!=2){ System.err.println("user inpath err !"); System.exit(-1); } @SuppressWarnings("deprecation") Job job=new Job(new Configuration(),"Paixu"); job.setJarByClass(AddNums.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(saMaps.class); job.setReducerClass(saReduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); } public static class saMaps extends Mapper<LongWritable, Text, IntWritable, NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { String nums = value.toString(); int n = Integer.parseInt(nums); context.write(new IntWritable(n), NullWritable.get()); } } //shuffle 有一个自定义的排序.如果是数字升序排列。从小到大.如果是文字,就安字典顺序 public static class saReduce extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable>{ int Nums=0; @Override protected void reduce(IntWritable key, Iterable<NullWritable> value, Reducer<IntWritable, NullWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { Nums++; context.write(new IntWritable(Nums), key); } } }
package demos; /*求平均分 张三 98 李四 96 王五 95 张三 90 李四 92 王五 99 张三 80 李四 90 王五 94 张三 82 李四 92*/ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Avgs { public static void main(String[] args) throws Exception { if (args.length!=2) { System.err.println("user infos err: <inpath>,<outpath>"); System.exit(-1); } @SuppressWarnings("deprecation") Job job = new Job(new Configuration(), "savg"); job.setJarByClass(Avgs.class); FileInputFormat.addInputPath(job, new Path(args[0]));//输入路径 FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出路径 job.setMapperClass(SortMap.class); job.setReducerClass(scRedcue.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.waitForCompletion(true); } public static class S bf03 ortMap extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] lines = value.toString().split("\t"); String name = lines[0].trim(); String score = lines[1].trim(); int sc = Integer.parseInt(score); context.write(new Text(name), new IntWritable(sc)); } } public static class scRedcue extends Reducer<Text, IntWritable, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { int sum=0; int i=0; for (IntWritable sc : value) { sum+=sc.get(); i++; } double avgs=sum/i; context.write(key, new DoubleWritable(avgs)); } } }
package demos; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* 数据简单去重 源数据: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c 最终结果: 2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d */ public class DatatoHeavy { public static void main(String[] args) throws Exception { if (args.length!=2) { System.err.println("path err"); System.exit(-1); } @SuppressWarnings("deprecation") Job job=new Job(new Configuration(),"quchong"); job.setJarByClass(DatatoHeavy.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(hmap.class); job.setReducerClass(hreduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } //map public static class hmap extends Mapper<LongWritable, Text, Text,NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } //使用shuffle阶段,自己有一个去重的功能,就是把key相同的合并起来. //reduce public static class hreduce extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> value, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
相关文章推荐
- SQL中的关联更新和关联删除
- LeetCode Valid Perfect Square
- centos关机与重启命令详解
- HDU 5963 朋友/podru 4000 ga (找规律+map&pair基本用法)
- docker study --- runc
- LeetCode_136. Single Number
- 43. Multiply Strings
- 递归程序复杂度计算->主定理
- POJ 2187 Beauty Contest [凸包 旋转卡壳]
- HDU 3045 Picnic Cows
- (树)判断二叉树是否为BST
- SQLite事务与自增深度分析
- PAT_A 1015. Reversible Primes (20)
- Android 屏幕录制命令adb screenrecord
- Android 获取GPS速度
- Android 使用SVG动画
- SQL Server事务、隔离级别详解(二十九)
- 在ListView中增加HeaderView和FootView
- 关于Wampserver软件安装后输入localhost无法加载页面的原因
- 线性代数学习笔记(三)