您的位置:首页 > 运维架构

Hadoop multipleoutputs使用<转>

2014-06-27 18:13 393 查看
package com.loganalysis;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Partitioner;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.mapred.lib.MultipleOutputs;

public class CK {

public static class Key implements WritableComparable<Key> {

public String plate_id = "";

public String hour_id = "";



// public Key(String p, String h){

// this.plate_id = p;

// this.hour_id = h;

// }



public String getPlateid(){

return this.plate_id;

}



public String getHourid(){

return this.hour_id;

}



public void set(String p, String h){

this.plate_id = p;

this.hour_id = h;

}



@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(this.plate_id);

out.writeUTF(this.hour_id);

}



@Override

public void readFields(DataInput in) throws IOException {

this.plate_id = in.readUTF();

this.hour_id = in.readUTF();

}



@Override

public boolean equals(Object k) {

if (k instanceof Key) {

Key r = (Key) k;

return this.plate_id.equals(((Key) k).plate_id) && this.hour_id.equals(((Key) k).hour_id);

} else {

return false;

}

}



public String toString(){

return this.plate_id +"\t"+ this.hour_id;

}



@Override

public int compareTo(Key k) {

int cmp = this.plate_id.compareTo(k.plate_id);



if(cmp != 0){

return cmp;

}



if(this.hour_id.compareTo(k.hour_id) > 0){

return 1;

}else if(this.hour_id.compareTo(k.hour_id) < 0){

return -1;

}else{

return 0;

}



}

}



public static class MapClass extends MapReduceBase implements

Mapper<LongWritable, Text, Key, Text> {

//private Key plateHour = new Key();

private Text word = new Text();

private Key plateHour = new Key();

@Override

public void map(LongWritable key,

Text value,

OutputCollector<Key, Text> output,

Reporter reporter) throws IOException {

String [] line = value.toString().split("\t");

String cookieId = line[0];

String hourId = line[1].substring(11, 13);

String plateId = line[3];



plateHour.set(plateId,hourId);

word.set(hourId + "\t" + cookieId);

output.collect(plateHour, word);

}





}



public static class Reduce extends MapReduceBase implements Reducer<Key, Text, Text, Text> {

private Text word = new Text();

private Text hour_word = new Text();

private Text hour_total = new Text();

private Text total_total = new Text();

private Map<String, Integer> hour_map = new HashMap<String, Integer>();

private Map<String, Integer> total_map = new HashMap<String, Integer>();



private MultipleOutputs mop;



@Override

public void configure(JobConf conf){

mop = new MultipleOutputs(conf);

}



@Override

public void close() throws IOException{

mop.close();

}



@Override

public void reduce(Key key,

Iterator<Text> values,

OutputCollector<Text, Text> output,

Reporter reporter) throws IOException {

String newHour = "";

int count = 0;



total_map.clear();

while(values.hasNext()) {

String [] value = values.next().toString().split("\t");



//按小時UV輸出

if(!newHour.equals(value[0])){

if(count != 0){

hour_total.set(Integer.toString(hour_map.size()));

hour_word.set(key.getPlateid() + "\t" + newHour);

//output.collect(hour_word, hour_total);

mop.getCollector("fix1",
"fix1", reporter).collect(hour_word, hour_total);

}

newHour = value[0];

hour_map.clear();

count = 0;

hour_map.put(value[1], 0);

count ++;

}else{

hour_map.put(value[1], 0);

count ++;

}



total_map.put(value[1], 0);



}

//按小時UV輸出

hour_total.set(Integer.toString(hour_map.size()));

hour_word.set(key.getPlateid() + "\t" + newHour);

//output.collect(hour_word, hour_total);

mop.getCollector("fix1",
"fix1", reporter).collect(hour_word, hour_total);



//總UV輸出

total_total.set(Integer.toString(total_map.size()));

word.set(key.getPlateid());

//output.collect(word, total_total);

mop.getCollector("fix2",
"fix2", reporter).collect(word, total_total);



}

}



public static class FirstGroupingComparator extends WritableComparator {

protected FirstGroupingComparator() {

super(Key.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

return ((Key)a).getPlateid().compareTo(((Key)b).getPlateid());

//一定通过第一个key进行分组

}



}

public static class FirstPartitioner implements Partitioner<Key, Text> {

@Override

public int getPartition(Key key, Text value, int numPartitions) {

return key.getPlateid().hashCode()&Integer.MAX_VALUE%numPartitions;

}

@Override

public void configure(JobConf job) {

// TODO Auto-generated method stub



}

}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(CK.class);

conf.setJobName("Composite key");

// 设置Map输出的key和value的类型

conf.setMapOutputKeyClass(Key.class);

conf.setMapOutputValueClass(Text.class);

// 设置Reduce输出的key和value的类型

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(Text.class);

// 设置Mapper和Reducer

conf.setMapperClass(MapClass.class);

conf.setReducerClass(Reduce.class);

// 设置group函数和分区函数

conf.setOutputValueGroupingComparator(FirstGroupingComparator.class);

conf.setPartitionerClass(FirstPartitioner.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

// conf.setOutputFormat(SequenceFileOutputFormat.class);

MultipleOutputs.addMultiNamedOutput(conf,
"fix1", TextOutputFormat.class, Text.class, Text.class);

MultipleOutputs.addMultiNamedOutput(conf, "fix2", TextOutputFormat.class, Text.class, Text.class);



// 如果输出目录已经存在,那么先将其删除

FileSystem fstm = FileSystem.get(conf);

Path outDir = new Path(args[1]);

fstm.delete(outDir, true);

// 设置输入输出目录

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, outDir);

JobClient.runJob(conf);



}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: