您的位置:首页 > 其它

Mapreducer天气排序,分区,分组,排序

2015-08-20 16:48 309 查看
<pre name="code" class="html"><pre name="code" class="html">package com.zjs.mr2;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobRun {
public static void main(String[] args) {
JobRun jr =new JobRun();
try {
System.out.println(jr.run() ?"执行成功":"执行失败");
} catch (Exception e) {
e.printStackTrace();
}
}

public boolean run() throws Exception{
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://node6:8020");
config.set("yarn.resourcemanager.hostname", "node7");

//取得文件系统
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
//设置任务调度类
//	job.setJarByClass(JobRun.class);
//设置mapper
job.setMapperClass(Mymapper.class);
job.setReducerClass(Myreducer.class);
job.setPartitionerClass(MyParttioner.class);
job.setSortComparatorClass(Mysort.class);
job.setGroupingComparatorClass(MyGroup.class);
//		//执行Combiner程序
//		job.setCombinerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Mykey.class);
job.setMapOutputValueClass(Text.class);
//设置reducer 任务的个数
job.setNumReduceTasks(66);
//指定MR的输入数据(文件)
FileInputFormat.addInputPath(job, new Path("/usr/input/hot"));
//指定MR输出数据目录,该目录不能存在,MR在启动之处要检查该目录是否存在,如果存在报错。

Path outpath =new Path("/usr/output/hot");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
//执行该任务(MR),并等待MR完成
return job.waitForCompletion(true);
}

//
public static class Mymapper extends Mapper<LongWritable, Text, Mykey, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//取出每行的记录值放在mykey里面
String line = value.toString();
String[] args = line.split("\t");
//取出时间
SimpleDateFormat sdf =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date date =  sdf.parse(args[0]);//取出时间
Calendar c = Calendar.getInstance();
c.setTime(date);
Mykey a = new Mykey();
a.setMonth(c.get(Calendar.MONTH));
a.setYear(c.get(Calendar.YEAR));
a.setHot(Double.parseDouble(args[1].substring(0, args[1].lastIndexOf("c"))));
context.write(a, value);
} catch (Exception e) {
e.printStackTrace();
}

}
}

public static class Myreducer extends Reducer<Mykey, Text, NullWritable, Text>{
NullWritable outkey =NullWritable.get();

@Override
protected void reduce(Mykey mykey, Iterable<Text> iteger,
Context context)
throws IOException, InterruptedException {
int  i  =0 ;
System.out.println(mykey.getYear()+"*********"+mykey.getMonth());
for(Text value:iteger){
if(i==10){
break;
}
i++;
context.write(outkey, value);
}

}

}

}


package com.zjs.mr2;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator{

public MyGroup(){
super(Mykey.class,true);
}

public int compare(WritableComparable a, WritableComparable b) {
Mykey o1 =(Mykey) a;
Mykey o2 =(Mykey) b;
int v =Integer.compare(o1.getYear(), o2.getYear());
if(v==0){
return Integer.compare(o1.getMonth(), o2.getMonth());
}
return v;
}

}


package com.zjs.mr2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class Mykey implements WritableComparable<Mykey>{
private int year;//年份
private int month;//月份
private double hot;//温度
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
}
/**
* 对象序列化
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeDouble(hot);
}

/**
* 反序列化
*/
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
this.hot = in.readDouble();
}

@Override
public int compareTo(Mykey a) {
int v = Integer.compare(this.year, a.getYear());
if(v==0){
int v2 = Integer.compare(this.month, a.getMonth());
if(v2==0){
return Double.compare(this.hot, a.getHot());
}
return v2;
}
return v;

}

}


package com.zjs.mr2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyParttioner extends Partitioner<Mykey, Text>{

@Override
public int getPartition(Mykey key, Text value, int reduceNum) {
return (key.getYear() & Integer.MAX_VALUE) % reduceNum;
}

}


package com.zjs.mr2;

import org.apache.hadoop.io.WritableComparator;

public class Mysort extends WritableComparator{

public Mysort(){
super(Mykey.class,true);
}

@Override
public int compare(Object a, Object b) {
Mykey s = (Mykey)a;
Mykey w = (Mykey)b;
int v = Integer.compare(s.getYear(), w.getYear());
if(v==0){
int v2  = Integer.compare(s.getMonth(), w.getMonth());
if(v2==0){
return -Double.compare(s.getHot(), w.getHot());//降序
}
return v2;
}
return v;
}

}




                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: