您的位置:首页 > 其它

mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序

2017-06-28 16:11 537 查看
自定义类:

package myhadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class KeyClass implements WritableComparable<KeyClass>{

private int year;
private int hot;

public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public int getHot() {
return hot;
}

public void setHot(int hot) {
this.hot = hot;
}

@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.hot = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}

@Override
public int compareTo(KeyClass o) {
int result = Integer.compare(this.year, o.getYear());
if(result!=0){
return result;
}
return Integer.compare(this.hot, o.hot);
}

@Override
public String toString() {
return year+","+hot;
}

@Override
public int hashCode() {
return Integer.valueOf(year+hot).hashCode();
}

}
自定义排序:
package myhadoop;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class KeySort extends WritableComparator {

public KeySort() {
super(KeyClass.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
int result = Integer.compare(key1.getYear(), key2.getYear());
if(result!=0){
return result;
}
return -Integer.compare(key1.getHot(), key2.getHot());
}

}


自定义分区:
package myhadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class KeyPart extends Partitioner<KeyClass, Text>{

@Override
public int getPartition(KeyClass key, Text value, int num) {
return (key.getYear()*127)%num;
}

}


自定义分组:
package myhadoop;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class KeyGroup extends WritableComparator{
public KeyGroup() {
super(KeyClass.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
return Integer.compare(key1.getYear(), key2.getYear());
}
}


mapreduce:
package myhadoop;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KeyMapReduce {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class KeyMapper extends Mapper<LongWritable, Text, KeyClass, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split("@");
if(lines.length==2){
try {
Date date = sdf.parse(lines[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(Calendar.YEAR);
String hot = lines[1].substring(0, 2);
KeyClass keyClass = new KeyClass();
keyClass.setYear(year);
keyClass.setHot(Integer.parseInt(hot));
context.write(keyClass, value);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}

static class KeyReduce extends Reducer<KeyClass, Text, KeyClass, Text>{

@Override
protected void reduce(KeyClass key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
for(Text text:value){
context.write(key, text);

}
}

}

public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance();
job.setJobName("yearAndHot");
job.setJarByClass(KeyMapReduce.class);
job.setMapperClass(KeyMapper.class);
job.setReducerClass(KeyReduce.class);

job.setSortComparatorClass(KeySort.class);
job.setPartitionerClass(KeyPart.class);
job.setGroupingComparatorClass(KeyGroup.class);
job.setNumReduceTasks(3);

job.setOutputKeyClass(KeyClass.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/test/data.txt"));
FileOutputFormat.setOutputPath(job, new Path("/test/output"));

System.exit(job.waitForCompletion(true)==true?0:1);
}
}


数据:
2010-11-02 12:12:12@45℃
2010-11-03 11:11:11@40℃
2010-11-04 10:10:10@38℃
2011-11-02 12:12:12@45℃
2011-11-03 11:11:11@56℃
2011-11-04 10:10:10@34℃
2012-11-02 12:12:12@54℃
2012-11-03 11:11:11@56℃
2012-11-04 10:10:10@33℃
2012-11-05 10:00:12@23℃

输出:
[hadoop@master hadoop]$ hadoop fs -ls /test/output
Found 4 items
-rw-r--r-- 1 hadoop supergroup 0 2017-06-27 15:53 /test/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00000
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00001
-rw-r--r-- 1 hadoop supergroup 140 2017-06-27 15:52 /test/output/part-r-00002
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
2010,45 2010-11-02 12:12:12@45℃
2010,40 2010-11-03 11:11:11@40℃
2010,38 2010-11-04 10:10:10@38℃
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00001
2011,56 2011-11-03 11:11:11@56℃
2011,45 2011-11-02 12:12:12@45℃
2011,34 2011-11-04 10:10:10@34℃
^[[A[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00002
2012,56 2012-11-03 11:11:11@56℃
2012,54 2012-11-02 12:12:12@54℃
2012,33 2012-11-04 10:10:10@33℃
2012,23 2012-11-05 10:00:12@23℃
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐