您的位置:首页 > 运维架构

【Hadoop】Hadoop MR 自定义排序

2016-09-07 11:24 507 查看
1、概念



2、代码示例

FlowSort

package com.ares.hadoop.mr.flowsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import com.ares.hadoop.mr.exception.LineException;

public class FlowSort extends Configured implements Tool {
private static final Logger LOGGER = Logger.getLogger(FlowSort.class);
enum Counter {
LINESKIP
}

public static class FlowSortMapper extends Mapper<LongWritable, Text,
FlowBean, NullWritable> {
private String line;
private int length;
private final static char separator = '\t';

private String phoneNum;
private long upFlow;
private long downFlow;
private long sumFlow;

private FlowBean flowBean = new FlowBean();
private NullWritable nullWritable = NullWritable.get();

@Override
protected void map(
LongWritable key,
Text value,
Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
String errMsg;
try {
line = value.toString();
String[] fields = StringUtils.split(line, separator);
length = fields.length;
if (length != 4) {
throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
}

phoneNum = fields[0];
upFlow = Long.parseLong(fields[1]);
downFlow = Long.parseLong(fields[2]);
sumFlow = Long.parseLong(fields[3]);

flowBean.setPhoneNum(phoneNum);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setSumFlow(sumFlow);

context.write(flowBean, nullWritable);
} catch (LineException e) {
// TODO: handle exception
LOGGER.error(e);
System.out.println(e);
context.getCounter(Counter.LINESKIP).increment(1);
return;
} catch (NumberFormatException e) {
// TODO: handle exception
errMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...";
LOGGER.error(errMsg);
System.out.println(errMsg);
context.getCounter(Counter.LINESKIP).increment(1);
return;
} catch (Exception e) {
// TODO: handle exception
LOGGER.error(e);
System.out.println(e);
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
}
}

public static class FlowSortReducer extends Reducer<FlowBean, NullWritable,
FlowBean, NullWritable> {
@Override
protected void reduce(
FlowBean key,
Iterable<NullWritable> values,
Reducer<FlowBean, NullWritable, FlowBean, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
context.write(key, NullWritable.get());
}
}

@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String errMsg = "FlowSort: TEST STARTED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);

Configuration conf = new Configuration();
//FOR Eclipse JVM Debug
//conf.set("mapreduce.job.jar", "flowsum.jar");
Job job = Job.getInstance(conf);

// JOB NAME
job.setJobName("FlowSort");

// JOB MAPPER & REDUCER
job.setJarByClass(FlowSort.class);
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);

// MAP & REDUCE
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
// MAP
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);

// JOB INPUT & OUTPUT PATH
//FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, args[1]);
FileOutputFormat.setOutputPath(job, new Path(args[2]));

// VERBOSE OUTPUT
if (job.waitForCompletion(true)) {
errMsg = "FlowSort: TEST SUCCESSFULLY...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 0;
} else {
errMsg = "FlowSort: TEST FAILED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 1;
}

}

public static void main(String[] args) throws Exception {
if (args.length != 3) {
String errMsg = "FlowSort: ARGUMENTS ERROR";
LOGGER.error(errMsg);
System.out.println(errMsg);
System.exit(-1);
}

int result = ToolRunner.run(new Configuration(), new FlowSort(), args);
System.exit(result);
}
}


FlowBean

package com.ares.hadoop.mr.flowsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
private String phoneNum;
private long upFlow;
private long downFlow;
private long sumFlow;

public FlowBean() {
// TODO Auto-generated constructor stub
}
//    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
//        super();
//        this.phoneNum = phoneNum;
//        this.upFlow = upFlow;
//        this.downFlow = downFlow;
//        this.sumFlow = sumFlow;
//    }

public String getPhoneNum() {
return phoneNum;
}

public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
phoneNum = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phoneNum);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override
public String toString() {
return "" + phoneNum + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
}

@Override
public int compareTo(FlowBean o) {
// TODO Auto-generated method stub
return sumFlow>o.getSumFlow()?-1:1;
}

}


LineException

package com.ares.hadoop.mr.exception;

public class LineException extends RuntimeException {
private static final long serialVersionUID = 2536144005398058435L;

public LineException() {
super();
// TODO Auto-generated constructor stub
}

public LineException(String message, Throwable cause) {
super(message, cause);
// TODO Auto-generated constructor stub
}

public LineException(String message) {
super(message);
// TODO Auto-generated constructor stub
}

public LineException(Throwable cause) {
super(cause);
// TODO Auto-generated constructor stub
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: