Mapreducer天气排序,分区,分组,排序
2015-08-20 16:48
309 查看
<pre name="code" class="html"><pre name="code" class="html">package com.zjs.mr2; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobRun { public static void main(String[] args) { JobRun jr =new JobRun(); try { System.out.println(jr.run() ?"执行成功":"执行失败"); } catch (Exception e) { e.printStackTrace(); } } public boolean run() throws Exception{ Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://node6:8020"); config.set("yarn.resourcemanager.hostname", "node7"); //取得文件系统 FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); //设置任务调度类 // job.setJarByClass(JobRun.class); //设置mapper job.setMapperClass(Mymapper.class); job.setReducerClass(Myreducer.class); job.setPartitionerClass(MyParttioner.class); job.setSortComparatorClass(Mysort.class); job.setGroupingComparatorClass(MyGroup.class); // //执行Combiner程序 // job.setCombinerClass(WordCountReducer.class); job.setMapOutputKeyClass(Mykey.class); job.setMapOutputValueClass(Text.class); //设置reducer 任务的个数 job.setNumReduceTasks(66); //指定MR的输入数据(文件) FileInputFormat.addInputPath(job, new Path("/usr/input/hot")); //指定MR输出数据目录,该目录不能存在,MR在启动之处要检查该目录是否存在,如果存在报错。 Path outpath =new Path("/usr/output/hot"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); //执行该任务(MR),并等待MR完成 return job.waitForCompletion(true); } // public static class Mymapper extends Mapper<LongWritable, Text, Mykey, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //取出每行的记录值放在mykey里面 String line = value.toString(); String[] args = line.split("\t"); //取出时间 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { Date date = sdf.parse(args[0]);//取出时间 Calendar c = Calendar.getInstance(); c.setTime(date); Mykey a = new Mykey(); a.setMonth(c.get(Calendar.MONTH)); a.setYear(c.get(Calendar.YEAR)); a.setHot(Double.parseDouble(args[1].substring(0, args[1].lastIndexOf("c")))); context.write(a, value); } catch (Exception e) { e.printStackTrace(); } } } public static class Myreducer extends Reducer<Mykey, Text, NullWritable, Text>{ NullWritable outkey =NullWritable.get(); @Override protected void reduce(Mykey mykey, Iterable<Text> iteger, Context context) throws IOException, InterruptedException { int i =0 ; System.out.println(mykey.getYear()+"*********"+mykey.getMonth()); for(Text value:iteger){ if(i==10){ break; } i++; context.write(outkey, value); } } } }
package com.zjs.mr2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyGroup extends WritableComparator{ public MyGroup(){ super(Mykey.class,true); } public int compare(WritableComparable a, WritableComparable b) { Mykey o1 =(Mykey) a; Mykey o2 =(Mykey) b; int v =Integer.compare(o1.getYear(), o2.getYear()); if(v==0){ return Integer.compare(o1.getMonth(), o2.getMonth()); } return v; } }
package com.zjs.mr2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class Mykey implements WritableComparable<Mykey>{ private int year;//年份 private int month;//月份 private double hot;//温度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } /** * 对象序列化 */ @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeDouble(hot); } /** * 反序列化 */ @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.month = in.readInt(); this.hot = in.readDouble(); } @Override public int compareTo(Mykey a) { int v = Integer.compare(this.year, a.getYear()); if(v==0){ int v2 = Integer.compare(this.month, a.getMonth()); if(v2==0){ return Double.compare(this.hot, a.getHot()); } return v2; } return v; } }
package com.zjs.mr2; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyParttioner extends Partitioner<Mykey, Text>{ @Override public int getPartition(Mykey key, Text value, int reduceNum) { return (key.getYear() & Integer.MAX_VALUE) % reduceNum; } }
package com.zjs.mr2; import org.apache.hadoop.io.WritableComparator; public class Mysort extends WritableComparator{ public Mysort(){ super(Mykey.class,true); } @Override public int compare(Object a, Object b) { Mykey s = (Mykey)a; Mykey w = (Mykey)b; int v = Integer.compare(s.getYear(), w.getYear()); if(v==0){ int v2 = Integer.compare(s.getMonth(), w.getMonth()); if(v2==0){ return -Double.compare(s.getHot(), w.getHot());//降序 } return v2; } return v; } }
相关文章推荐
- Tomcat 7服务器线程模型
- 安卓使用mapbox加载离线地图及获取位置信息
- DOM(二)-01-(示例-新闻字体)
- js获取Html元素的实际宽度高度
- PAT 1003. Emergency (25)
- Struts2(二)——配置文件struts2.xml的编写
- POJ 3984 迷宫问题(BFS)
- 第四章 变量、作用域和内存问题
- 优先队列
- hdcpc
- androidstudio--gsonformat--超爽的数据解析方式
- CSS Hack技术介绍及常用的Hack技巧集锦
- 内存泄漏检查工具valgrind的安装与使用
- Confirmation on Leaving the Current Page in an Angular.js App
- 总结
- Activity生命周期
- yum命令详解
- jquery选择器
- Android实时获取音量(单位:分贝)
- 如何建立一个“绑定友好的”usercontrol--wpf