mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
2017-06-28 16:11
537 查看
自定义类:
package myhadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeyClass implements WritableComparable<KeyClass>{
private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.hot = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}
@Override
public int compareTo(KeyClass o) {
int result = Integer.compare(this.year, o.getYear());
if(result!=0){
return result;
}
return Integer.compare(this.hot, o.hot);
}
@Override
public String toString() {
return year+","+hot;
}
@Override
public int hashCode() {
return Integer.valueOf(year+hot).hashCode();
}
}
自定义排序:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class KeySort extends WritableComparator {
public KeySort() {
super(KeyClass.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
int result = Integer.compare(key1.getYear(), key2.getYear());
if(result!=0){
return result;
}
return -Integer.compare(key1.getHot(), key2.getHot());
}
}
自定义分区:
package myhadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class KeyPart extends Partitioner<KeyClass, Text>{
@Override
public int getPartition(KeyClass key, Text value, int num) {
return (key.getYear()*127)%num;
}
}
自定义分组:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class KeyGroup extends WritableComparator{
public KeyGroup() {
super(KeyClass.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
return Integer.compare(key1.getYear(), key2.getYear());
}
}
mapreduce:
package myhadoop;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class KeyMapReduce {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class KeyMapper extends Mapper<LongWritable, Text, KeyClass, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split("@");
if(lines.length==2){
try {
Date date = sdf.parse(lines[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(Calendar.YEAR);
String hot = lines[1].substring(0, 2);
KeyClass keyClass = new KeyClass();
keyClass.setYear(year);
keyClass.setHot(Integer.parseInt(hot));
context.write(keyClass, value);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
static class KeyReduce extends Reducer<KeyClass, Text, KeyClass, Text>{
@Override
protected void reduce(KeyClass key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
for(Text text:value){
context.write(key, text);
}
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance();
job.setJobName("yearAndHot");
job.setJarByClass(KeyMapReduce.class);
job.setMapperClass(KeyMapper.class);
job.setReducerClass(KeyReduce.class);
job.setSortComparatorClass(KeySort.class);
job.setPartitionerClass(KeyPart.class);
job.setGroupingComparatorClass(KeyGroup.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(KeyClass.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/test/data.txt"));
FileOutputFormat.setOutputPath(job, new Path("/test/output"));
System.exit(job.waitForCompletion(true)==true?0:1);
}
}
数据:
2010-11-02 12:12:12@45℃
2010-11-03 11:11:11@40℃
2010-11-04 10:10:10@38℃
2011-11-02 12:12:12@45℃
2011-11-03 11:11:11@56℃
2011-11-04 10:10:10@34℃
2012-11-02 12:12:12@54℃
2012-11-03 11:11:11@56℃
2012-11-04 10:10:10@33℃
2012-11-05 10:00:12@23℃
输出:
[hadoop@master hadoop]$ hadoop fs -ls /test/output
Found 4 items
-rw-r--r-- 1 hadoop supergroup 0 2017-06-27 15:53 /test/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00000
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00001
-rw-r--r-- 1 hadoop supergroup 140 2017-06-27 15:52 /test/output/part-r-00002
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
2010,45 2010-11-02 12:12:12@45℃
2010,40 2010-11-03 11:11:11@40℃
2010,38 2010-11-04 10:10:10@38℃
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00001
2011,56 2011-11-03 11:11:11@56℃
2011,45 2011-11-02 12:12:12@45℃
2011,34 2011-11-04 10:10:10@34℃
^[[A[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00002
2012,56 2012-11-03 11:11:11@56℃
2012,54 2012-11-02 12:12:12@54℃
2012,33 2012-11-04 10:10:10@33℃
2012,23 2012-11-05 10:00:12@23℃
package myhadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeyClass implements WritableComparable<KeyClass>{
private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.hot = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}
@Override
public int compareTo(KeyClass o) {
int result = Integer.compare(this.year, o.getYear());
if(result!=0){
return result;
}
return Integer.compare(this.hot, o.hot);
}
@Override
public String toString() {
return year+","+hot;
}
@Override
public int hashCode() {
return Integer.valueOf(year+hot).hashCode();
}
}
自定义排序:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class KeySort extends WritableComparator {
public KeySort() {
super(KeyClass.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
int result = Integer.compare(key1.getYear(), key2.getYear());
if(result!=0){
return result;
}
return -Integer.compare(key1.getHot(), key2.getHot());
}
}
自定义分区:
package myhadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class KeyPart extends Partitioner<KeyClass, Text>{
@Override
public int getPartition(KeyClass key, Text value, int num) {
return (key.getYear()*127)%num;
}
}
自定义分组:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class KeyGroup extends WritableComparator{
public KeyGroup() {
super(KeyClass.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyClass key1 = (KeyClass) a;
KeyClass key2 = (KeyClass) b;
return Integer.compare(key1.getYear(), key2.getYear());
}
}
mapreduce:
package myhadoop;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class KeyMapReduce {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class KeyMapper extends Mapper<LongWritable, Text, KeyClass, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split("@");
if(lines.length==2){
try {
Date date = sdf.parse(lines[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(Calendar.YEAR);
String hot = lines[1].substring(0, 2);
KeyClass keyClass = new KeyClass();
keyClass.setYear(year);
keyClass.setHot(Integer.parseInt(hot));
context.write(keyClass, value);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
static class KeyReduce extends Reducer<KeyClass, Text, KeyClass, Text>{
@Override
protected void reduce(KeyClass key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
for(Text text:value){
context.write(key, text);
}
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance();
job.setJobName("yearAndHot");
job.setJarByClass(KeyMapReduce.class);
job.setMapperClass(KeyMapper.class);
job.setReducerClass(KeyReduce.class);
job.setSortComparatorClass(KeySort.class);
job.setPartitionerClass(KeyPart.class);
job.setGroupingComparatorClass(KeyGroup.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(KeyClass.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/test/data.txt"));
FileOutputFormat.setOutputPath(job, new Path("/test/output"));
System.exit(job.waitForCompletion(true)==true?0:1);
}
}
数据:
2010-11-02 12:12:12@45℃
2010-11-03 11:11:11@40℃
2010-11-04 10:10:10@38℃
2011-11-02 12:12:12@45℃
2011-11-03 11:11:11@56℃
2011-11-04 10:10:10@34℃
2012-11-02 12:12:12@54℃
2012-11-03 11:11:11@56℃
2012-11-04 10:10:10@33℃
2012-11-05 10:00:12@23℃
输出:
[hadoop@master hadoop]$ hadoop fs -ls /test/output
Found 4 items
-rw-r--r-- 1 hadoop supergroup 0 2017-06-27 15:53 /test/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00000
-rw-r--r-- 1 hadoop supergroup 105 2017-06-27 15:52 /test/output/part-r-00001
-rw-r--r-- 1 hadoop supergroup 140 2017-06-27 15:52 /test/output/part-r-00002
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
2010,45 2010-11-02 12:12:12@45℃
2010,40 2010-11-03 11:11:11@40℃
2010,38 2010-11-04 10:10:10@38℃
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00001
2011,56 2011-11-03 11:11:11@56℃
2011,45 2011-11-02 12:12:12@45℃
2011,34 2011-11-04 10:10:10@34℃
^[[A[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00002
2012,56 2012-11-03 11:11:11@56℃
2012,54 2012-11-02 12:12:12@54℃
2012,33 2012-11-04 10:10:10@33℃
2012,23 2012-11-05 10:00:12@23℃
相关文章推荐
- mapreduce,自定义分区,分组,排序实现join
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- MapReduce的自定义排序、分区和分组
- 请通过代码实现以下功能:输入一个5X5的二维数组,将数组进行排序,其中一维数组按照平均值降序,一维数组内部升序排列。
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- hadoop自定义排序、分组、分区(温度统计)
- MapReduce-WordCount实现按照value降序排序、字符小写、识别不同标点
- 实现按a字段分组后,在组内再按照b字段排序,之后每组取Top n条数据
- Hadoop 自定义排序,自定义分区,自定义分组
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 函数指针作为函数参数,实现冒泡排序的升序排序和降序排序
- OC中超级无敌排序方法,降序 升序 自定义降序升序
- python字典实现按照自定义顺序排序
- LINQ中的OrderBy实现多字段升序、降序排序实现
- Hadoop Mapreduce分区、分组、二次排序过程详解
- 自定义分区、数据类型、排序、分组
- C语言链表中数组实现数据选择排序,升序、降序功能主要难点
- 快速排序:升序+降序----java实现
- 通过orderby关键字,LINQ可以实现升序和降序排序。LINQ还支持次要排序。
- how to 实现一个排序 ,第一排序按状态升序,第二排序状态为2 时,按id 升序,状态为其它时,按状态降序