您的位置:首页 > 运维架构

统计和TopKey

2015-11-16 18:35 393 查看
key和value的默认分隔符为tab键
设置分隔符

程序一
package org.conan.myhadoop.TopKey;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//单文件最值
public class TopKMapReduce {

static class TopKMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
// 输出的key
private Text mapOutputKey = new Text();
// 输出的value
private LongWritable mapOutputValue = new LongWritable();

// 存储最大值和初始值
long topkValue = Long.MIN_VALUE;

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String lineValue = value.toString();

String[] strs = lineValue.split("\t");
// 中间值
long tempValue = Long.valueOf(strs[1]);

if (topkValue < tempValue) {

topkValue = tempValue;
mapOutputKey.set(strs[0]);
}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mapOutputValue.set(topkValue);
context.write(mapOutputKey, mapOutputValue);
}

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, TopKMapReduce.class.getSimpleName());
job.setJarByClass(TopKMapReduce.class);
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TopKMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// job.setReducerClass(ModuleReducer.class);
// job.setOutputKeyClass(LongWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);

Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
Boolean isCompletion = job.waitForCompletion(true);
return isCompletion ? 0 : 1;
}

public static void main(String[] args) throws Exception {
args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
"hdfs://hadoop-master:9000/data/topkoutput" };
int status = new TopKMapReduce().run(args);
System.exit(status);
}

}


程序二
package org.conan.myhadoop.TopKey;

import java.io.IOException;
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//单文件 top n TreeMap实现
public class TopKMapReduceV2 {

static class TopKMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {

public static final int K=3;//前三名
private LongWritable mapKey = new LongWritable();
private Text mapValue = new Text();

TreeMap<LongWritable, Text> topMap = null;//默认按key的升序排列

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String lineValue = value.toString();

String[] strs = lineValue.split("\t");

long tempValue = Long.valueOf(strs[1]);
String tempKey=strs[0];
mapKey.set(tempValue);
mapValue.set(tempKey);
topMap.put(mapKey, mapValue);

if(topMap.size()>K){
topMap.remove(topMap.firstKey());

}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Set<LongWritable> keySet=    topMap.keySet();
for( LongWritable key:keySet) {

context.write(topMap.get(key), key);
}
}

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName());
job.setJarByClass(TopKMapReduceV2.class);
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TopKMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// job.setReducerClass(ModuleReducer.class);
// job.setOutputKeyClass(LongWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);

Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
Boolean isCompletion = job.waitForCompletion(true);
return isCompletion ? 0 : 1;
}

public static void main(String[] args) throws Exception {
args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
"hdfs://hadoop-master:9000/data/topkoutput2" };
int status = new TopKMapReduceV2().run(args);
System.exit(status);
}

}


程序三
package org.conan.myhadoop.TopKey;

import java.io.IOException;
import java.util.Comparator;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//单文件 top n TreeSet实现
public class TopKMapReduceV3 {

static class TopKMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {

public static final int K=3;//前三名

TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(//
new Comparator<TopKWritable>() {

@Override
public int compare(TopKWritable o1, TopKWritable o2) {

return o1.getCount().compareTo(o2.getCount());
}
}) ;

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String lineValue = value.toString();

String[] strs = lineValue.split("\t");

long tempValue = Long.valueOf(strs[1]);

topSet.add(new TopKWritable(strs[0], tempValue));

if(topSet.size()>K){
topSet.remove(topSet.first());

}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {

for( TopKWritable top:topSet) {

context.write(new Text(top.getWord()), new LongWritable(top.getCount()));
}
}

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName());
job.setJarByClass(TopKMapReduceV3.class);
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TopKMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// job.setReducerClass(ModuleReducer.class);
// job.setOutputKeyClass(LongWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);

Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
Boolean isCompletion = job.waitForCompletion(true);
return isCompletion ? 0 : 1;
}

public static void main(String[] args) throws Exception {
args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
"hdfs://hadoop-master:9000/data/topkoutput3" };
int status = new TopKMapReduceV3().run(args);
System.exit(status);
}

}


程序四 自定义数据类型加比较器

package org.conan.myhadoop.TopKey;

import java.io.IOException;
import java.util.Comparator;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//多个文件,需要reduce统计top n
public class TopKMapReduceV4 {

static class TopKMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String lineValue = value.toString();

String[] strs = lineValue.split("\t");

long tempValue = Long.valueOf(strs[1]);

context.write(new Text(strs[0]), new LongWritable(tempValue));
}

@Override
public void cleanup(Context context) throws IOException,
InterruptedException {

super.cleanup(context);
}

@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}

}

public static class TopKReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
public static final int K = 3;// 前三名
TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(//
new Comparator<TopKWritable>() {

@Override
public int compare(TopKWritable o1, TopKWritable o2) {

return o1.getCount().compareTo(o2.getCount());
}
});

@Override
public void setup(Context context) throws IOException,
InterruptedException {

super.setup(context);
}

@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {

count += value.get();
}

topSet.add(new TopKWritable(key.toString(), count));
if (topSet.size() > K) {

topSet.remove(topSet.first());
}
}

@Override
public void cleanup(Context context) throws IOException,
InterruptedException {

for (TopKWritable top : topSet) {
context.write(new Text(top.getWord()),
new LongWritable(top.getCount()));
}
}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName());
job.setJarByClass(TopKMapReduceV4.class);
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TopKMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(TopKReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);

Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
Boolean isCompletion = job.waitForCompletion(true);
return isCompletion ? 0 : 1;
}

public static void main(String[] args) throws Exception {
args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
"hdfs://hadoop-master:9000/data/topkoutput4" };
int status = new TopKMapReduceV4().run(args);
System.exit(status);
}

}
package org.conan.myhadoop.TopKey;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

//自定义数据类型
public class TopKWritable implements WritableComparable<TopKWritable> {

private String word;
private Long count;
public TopKWritable(){};
public TopKWritable(String word,Long count) {
this.set(word, count);
}
public void set(String word,Long count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public Long getCount() {
return count;
}

@Override
public void write(DataOutput out) throws IOException {

out.writeUTF(word);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {

this.word=in.readUTF();
this.count=in.readLong();
}

@Override
public int compareTo(TopKWritable o) {
int cmp=this.word.compareTo(o.getWord());
if(0!=cmp){

return cmp;
}

return this.count.compareTo(o.getCount());
}

@Override
public String toString() {
return word +"\t"+count;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((count == null) ? 0 : count.hashCode());
result = prime * result + ((word == null) ? 0 : word.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopKWritable other = (TopKWritable) obj;
if (count == null) {
if (other.count != null)
return false;
} else if (!count.equals(other.count))
return false;
if (word == null) {
if (other.word != null)
return false;
} else if (!word.equals(other.word))
return false;
return true;
}

}


程序五:经典案例





package org.conan.myhadoop.TopKey;

import java.io.IOException;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称
*
* 需求: 统计前十首播放最多的歌曲名称和次数
*
*
*/
public class TopKeyMapReduce {
public static final int K = 10;

static class TopKeyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String lineValue = value.toString();
if (null == lineValue) {
return;
}
String[] strs = lineValue.split("\t");
if (null!=strs&&strs.length==5){

String languageType=strs[0];
String singName=strs[1];
String playTimes=strs[3];
context.write(//
new Text(languageType+"\t"+ singName),//
new LongWritable(Long.valueOf(playTimes)));
}

}

@Override
public void cleanup(Context context) throws IOException,
InterruptedException {

super.cleanup(context);
}

@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}

}

public static class TopKeyReducer extends
Reducer<Text, LongWritable, TopKeyWritable, NullWritable> {

TreeSet<TopKeyWritable> topSet = new TreeSet<TopKeyWritable>();

@Override
public void setup(Context context) throws IOException,
InterruptedException {

super.setup(context);
}

@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
if (null==key){
return;
}

String[] splited =key.toString().split("\t");
if(null==splited||splited.length==0){
return ;
}

String languageType=splited[0];
String singName=splited[1];

Long playTimes=0L;
for (LongWritable value : values) {

playTimes += value.get();
}

topSet.add(new TopKeyWritable(languageType, singName, playTimes));

if (topSet.size() > K) {

topSet.remove(topSet.last());
}
}

@Override
public void cleanup(Context context) throws IOException,
InterruptedException {

for (TopKeyWritable top : topSet) {
context.write(top,NullWritable.get());
}
}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName());
job.setJarByClass(TopKeyMapReduce.class);
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(TopKeyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setReducerClass(TopKeyReducer.class);
job.setOutputKeyClass(TopKeyWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);

Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
Boolean isCompletion = job.waitForCompletion(true);
return isCompletion ? 0 : 1;
}

public static void main(String[] args) throws Exception {
args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input",
"hdfs://hadoop-master:9000/data/topkey/output" };
int status = new TopKMapReduceV4().run(args);
System.exit(status);
}
}
package org.conan.myhadoop.TopKey;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TopKeyWritable implements WritableComparable<TopKeyWritable> {
String languageType;
String singName;
Long playTimes;

public TopKeyWritable() {
};

public TopKeyWritable(String languageType, String singName, Long playTimes) {

this.set(languageType, singName, playTimes);
};

public void set(String languageType, String singName, Long playTimes) {

this.languageType = languageType;
this.singName = singName;
this.playTimes = playTimes;
}

public String getLanguageType() {
return languageType;
}

public String getSingName() {
return singName;
}

public Long getPlayTimes() {
return playTimes;
}

@Override
public void readFields(DataInput in) throws IOException {
this.languageType = in.readUTF();
this.singName = in.readUTF();
this.playTimes = in.readLong();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(languageType);
out.writeUTF(singName);
out.writeLong(playTimes);

}

@Override
public int compareTo(TopKeyWritable o) {

// 加个负号倒排序
return -(this.getPlayTimes().compareTo(o.getPlayTimes()));
}

@Override
public String toString() {
return languageType + "\t" + singName + "\t" + playTimes;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((languageType == null) ? 0 : languageType.hashCode());
result = prime * result
+ ((playTimes == null) ? 0 : playTimes.hashCode());
result = prime * result
+ ((singName == null) ? 0 : singName.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopKeyWritable other = (TopKeyWritable) obj;
if (languageType == null) {
if (other.languageType != null)
return false;
} else if (!languageType.equals(other.languageType))
return false;
if (playTimes == null) {
if (other.playTimes != null)
return false;
} else if (!playTimes.equals(other.playTimes))
return false;
if (singName == null) {
if (other.singName != null)
return false;
} else if (!singName.equals(other.singName))
return false;
return true;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  topkey