您的位置:首页 > 运维架构

MapReduce之气温计算

2017-05-27 11:27 190 查看
Step 1:

导入Hadoop和Mapreduce的所有jar包

Step 2:WeatherMapper

public class WeatherMapper extends Mapper<Text, Text, MyKey, Text>{

@Override
protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {

try {
// 1950-10-01 12:21:02  37c
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date =sdf.parse(key.toString());
Calendar c =Calendar.getInstance();
c.setTime(date);
int year =c.get(Calendar.YEAR);
int month =c.get(Calendar.MONTH);
int temp =Integer.parseInt(value.toString().substring(0, value.toString().lastIndexOf("c")));

MyKey mykey = new MyKey();
mykey.setYear(year);
mykey.setMonth(month);
mykey.setTemp(temp);

context.write(mykey, value);
} catch (Exception e) {
e.printStackTrace();
}
}
}


Step 3:Shffle阶段自定义Group

public class MyGroup extends WritableComparator{

@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
}
}

public MyGroup() {
super(MyKey.class,true);
}
}


Step 4:shuffle阶段自定义key

public class MyKey implements WritableComparable<MyKey>{

private int year;
private int month;
private int temp;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
this.temp = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(temp);
}
@Override
public int compareTo(MyKey o) {
int r1 =Integer.compare(this.year, o.getYear());
if(r1==0){
int r2 =Integer.compare(this.month, o.getMonth());
if(r2==0){
return  Double.compare(this.temp, o.getTemp());
}else{
return r2;
}
}else{
return r1;
}
}

}


Step 5:shuffle阶段自定义partition

public class MyPartitioner extends HashPartitioner<MyKey, Text>{

@Override
public int getPartition(MyKey key, Text value, int numReduceTasks) {
return (key.getYear()-1949) % numReduceTasks;
}

}


Step 6:shuffle阶段自定义排序

public class MySort extends WritableComparator{

public MySort() {
super(MyKey.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==0){
return -Double.compare(k1.getTemp(), k2.getTemp());
}else{
return r2;
}
}else{
return r1;
}
}

}


Step 7:shuffle阶段自定义combiner

public class WeatherCombiner extends Reducer<MyKey, Text, MyKey, Text>{

@Override
protected void reduce(MyKey key, Iterable<Text> values,
Context context) throws IOException,
InterruptedException {

int i = 0;
for(Text value : values){
context.write(key, value);
if(i==3){
break;
}
i++;
}
}

}


Step 8:weather之reducer阶段

public class WeatherReducer extends Reducer<MyKey, Text, Text, NullWritable>{

@Override
protected void reduce(MyKey key, Iterable<Text> values,
Context context) throws IOException,
InterruptedException {

int i = 0;
for(Text value : values){
context.write(new Text(key.getYear()+"\t"+key.getMonth()+"\t"+value), NullWritable.get());
if(i==3){
break;
}
i++;
}
}

}


Step 9:MapReduce之Main—-RunJob

public class RunJob {

public static void main(String[] args) {
try {
int numReduceTasks = Integer.valueOf(args[0]);

Configuration conf = new Configuration();
FileSystem fs = FileSystem.newInstance(conf);

Job job = Job.getInstance();
job.setJarByClass(RunJob.class);

job.setJobName("weather");
job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
//          job.setCombinerClass(WeatherCombiner.class);

job.setNumReduceTasks(numReduceTasks);

job.setGroupingComparatorClass(MyGroup.class);
job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MySort.class);

//          job.setCombinerClass(cls);

FileInputFormat.addInputPath(job, new Path("/data/weather"));
Path output = new Path("/weather");
if(fs.exists(output)){
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);

boolean flag = job.waitForCompletion(true);
if(flag){
System.out.println("Job finished !");
}

} catch (Exception e) {
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息