您的位置:首页 > 编程语言

Hadoop之深入MapReduce编程

2016-02-22 16:26 381 查看
前面已经介绍个几个MapReduce的例子,那个Hello world是最基础的,MapReduce Join篇写了怎么实现Map端和Reduce端的做法,还有个semi-join没有写出来,其实semi-join可以看做是两者的结合,所以没有做说明。MapReduce编程模型需要多写,多实践,毕竟多写笔下生花,只有遇到的坑多了,就没那么容易掉到坑里面,正所谓常在坑里走,哪有不被坑呢,

。这不,咱们就以Hadoop里面的MapReduce
examples为入口,多到坑里面走走看看。

1. Word序列

WordCount,WordMean,WordMedian,WordStandardDeviation

WordCount不说了,直接跳过。先看看WordMean,里面的map和reduce方法也是很简单,和WordCount基本差不多,不过这里是以count和length两个字符作为key,记录每个单词的次数和长度,到了reduce的时候,根据key,把Iterable<LongWritable> values加起来,其实map()和reduce()这里就做完了,不过这个例子多了一个方法readAndCalcMean,在输出的结果里面再来读取结果文件,我的结果文件类似这样:part-r-00000

count 5

length 23

分别读出count和length,然后将length/count就得到mean,然后打印出来。

在本地Eclipse运行的时候,遇到错误:

Exception in thread "main" DEBUG - stopping client from cache: org.apache.hadoop.ipc.Client@12998f87

java.lang.IllegalArgumentException: Wrong FS: hdfs://Hadoop-02:9000/output/part-r-00000, expected: file:///

at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)

at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)

at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)

at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)

at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)

at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)

at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)

at com.gl.test.WordMean.readAndCalcMean(WordMean.java:122)

at com.gl.test.WordMean.run(WordMean.java:186)

其实,reduce结果已经做完了,保存到hdfs,只是做readAndCalcMean出错。将代码稍作修改,便可以运行正常。

//FileSystem fs = FileSystem.get(conf);

Path file = new Path(path, "part-r-00000");

FileSystem fs = file.getFileSystem(conf);

WordMedian,这个和上面的也类似,我输入的文件是:

hello

world test

hello test

输出的结果很明显:

4 2

5 3

最后打印的结果是" The median is: 5"

这个例子里面有特别的地方是用到一个Counter(org.apache.hadoop.mapreduce.TaskCounter)

long totalWords = job.getCounters()

.getGroup(TaskCounter.class.getCanonicalName())

.findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();

WordStandardDeviation,过程同上面的,唯一不同的是,这里的计算公式稍微多点,也不复杂,不过我还是看了半天。

Std_dev=sqrt([l1*l1+l2*l2...+ln*ln-n*mean*mean]/n) n为count,mean为上面计算的均值,这里标准差公式根号内除以n,没有使用n-1。

这个例子里面很重要的是使用了之前map计算的结果,count计算总数字个数,length计算总字符串长度,便于后面计算均值,square为每个长度的平方,也是为后面计算的公式前面部分用,这样在最后计算的时候,只需要用前面的结果来提高效率。

2. 聚合

AggregateWordCount



这个例子关键是运行的时候加了这个一句:

Job job = ValueAggregatorJob.createValueAggregatorJob(args

, new Class[] {WordCountPlugInClass.class});

具体实现的时候是集成了ValueAggregatorBaseDescriptor,Mapreduce里面已经包含了各种数据类型的求和最大值,最小值的算法UniqValueCount,LongValueSum等等,本例就是LONG_VALUE_SUM

public static class WordCountPlugInClass extends

ValueAggregatorBaseDescriptor {

@Override

public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,

Object val) {

String countType = LONG_VALUE_SUM;

ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();

String line = val.toString();

StringTokenizer itr = new StringTokenizer(line);

while (itr.hasMoreTokens()) {

Entry<Text, Text> e = generateEntry(countType, itr.nextToken(), ONE);

if (e != null) {

retv.add(e);

}

}

return retv;

}

}

运行的时候,可以试试:

./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar aggregatewordcount /input /wordtest 2 textinputformat

结果类似这样:(part-r-00000)

record_count 6

AggregateWordHistogram



这个例子也差不多,就是最后处理有些不同

public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,

Object val) {

String words[] = val.toString().split(" |\t");

ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();

for (int i = 0; i < words.length; i++) {

Text valCount = new Text(words[i] + "\t" + "1");

Entry<Text, Text> en = generateEntry(VALUE_HISTOGRAM, "WORD_HISTOGRAM",

valCount);

retv.add(en);

}

return retv;

}

运行下面的命令:

./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar aggregatewordhist /input /wordtest 2 textinputformat

结果类似这样:record_count 3

3. 排序Sort和SecondarySort

本来examples里面带了一个Sort的例子,也很好,可是命令行直接运行还是有些麻烦,出了几次错误,里面的inFomat outKey等默认的参数格式比较难获得,生存那些格式的有些麻烦,这里还是自己仿照hello world写了个排序的,很简单,练练手。

我运行里面自带的例子的命令: ./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar sort -inFormat org.apache.hadoop.mapreduce.lib.input.TextInputFormat -outKey org.apache.hadoop.io.Text -outValue org.apache.hadoop.io.Text /testdata /testresult

自己实现mapper和reducer,这里是利用mapreduce会对结果的key进行排序,这样刚好是我们想要的,唯一要注意的就是key的类型,是Text,IntWritable,还是其他自己定义的。这里我们用个IntWritable来排序。

public static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable> {

private static IntWritable data=new IntWritable();

private final static IntWritable one = new IntWritable(1);

@Override

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String str = value.toString();

data.set(Integer.parseInt(str));

context.write(data, one);

}

}

public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

@Override

public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

context.write(key, null);//只需要写入key,value不需要写入,这样看到的结果就是排序的结果,一行一个,就像输入一样

}

}

//main运行函数

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length < 2) {

System.err.println("Usage: Sort <in> [<in>...] <out>");

System.exit(2);

}

Job job = new Job(conf, "test sort");

job.setJarByClass(Sort.class);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);

//job.setNumReduceTasks(1);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

for (int i = 0; i < otherArgs.length - 1; ++i) {

FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

}

long t = System.currentTimeMillis();

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1] + "/" + t));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

SecondarySort 二次排序的要求是按照第一字段排序之后,如果第一字段相同的,继续按照第二字段排序,当然不能破坏第一字段的排序。整个过程的基本思路是实现自定义格式的数据的排序,例子里面定义了一个IntPair实现了WritableComparable接口,提供了对字段的序列化,比较等方法。在mapper读取两个字段的数据之后,定义了一个PartitionerClass,主要依据第一个字段来Partitioner,这样保证第一字段相同的在同一个分区。分区函数类。这是key的第一次比较。key比较函数类,这是key的第二次比较。这是一个比较器,需要继承WritableComparator(这就是所谓的二次排序)。然后就是reducer,在不同的第一字段加了个SEPARATOR。例子里面有个FirstGroupingComparator,注意是构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。。二次排序需要制定这两个。

// group and partition by the first int in the pair

job.setPartitionerClass(FirstPartitioner.class);

job.setGroupingComparatorClass(FirstGroupingComparator.class);

./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar secondarysort hdfs://Hadoop-02:9000/testdata hdfs://Hadoop-02:9000/testresult

输入数据:(以tab分隔)

43 12

43 3

43 15

25 4

25 10

25 2

最后的结果是:

------------------------------------------------

25 2

25 4

25 10

------------------------------------------------

43 3

43 12

43 15

这里总结下,排序是MapReduce的很重要的技术,尽管应用程序本身不需要对数据排序,但可以使用MapReduce的排序功能来组织数据。默认情况下,MapReduce根据输入记录的键对数据排序。键的排列顺序是由RawComparator控制的,排序规则如下:

1)若属性mapred.output.key.comparator.class已设置,则使用该类的实例;

2)否则键必须是WritableComparable的子类,并使用针对该键类的已登记的comparator;(例如上例中用到的static块)

static { // register this comparator

WritableComparator.define(IntPair.class, new Comparator());

}

3)如果还没有已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。

4. 倒排文档索引 (TF-IDX)

具体的思路不多说,直接上代码,引自网上,稍作修改(实际运行通过)
import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {

public static class Map extends Mapper<Object, Text, Text, Text> {

private Text keyInfo = new Text(); // 存储单词和URL组合

private Text valueInfo = new Text(); // 存储词频

private FileSplit split; // 存储Split对象

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

// 获得<key,value>对所属的FileSplit对象

String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

split = (FileSplit) context.getInputSplit();

StringTokenizer itr = new StringTokenizer(value.toString()," ");

while (itr.hasMoreTokens()) {

// key值由单词和URL组成,如"MapReduce:file1.txt"

// 获取文件的完整路径

// keyInfo.set(itr.nextToken()+":"+split.getPath().toString());

// 这里为了好看,只获取文件的名称。

int splitIndex = split.getPath().toString().lastIndexOf("/");

keyInfo.set(itr.nextToken() + ":"

+ split.getPath().toString().substring(splitIndex+1));

// 词频初始化为1

valueInfo.set("1");

context.write(keyInfo, valueInfo);

}

}

}

public static class Combine extends Reducer<Text, Text, Text, Text> {

private Text info = new Text();

// 实现reduce函数

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

// 统计词频

int sum = 0;

for (Text value : values) {

sum += Integer.parseInt(value.toString());

}

int splitIndex = key.toString().indexOf(":");

// 重新设置value值由URL和词频组成

info.set(key.toString().substring(splitIndex + 1) + ":" + sum);

// 重新设置key值为单词

key.set(key.toString().substring(0, splitIndex));

context.write(key, info);

}

}

public static class Reduce extends Reducer<Text, Text, Text, Text> {

private Text result = new Text();

// 实现reduce函数

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

// 生成文档列表

String fileList = new String();

for (Text value : values) {

fileList += value.toString() + ";";

}

result.set(fileList);

context.write(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: InvertedIndex <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "Inverted Index");

job.setJarByClass(InvertedIndex.class);

// 设置Map、Combine和Reduce处理类

job.setMapperClass(Map.class);

job.setCombinerClass(Combine.class);

job.setReducerClass(Reduce.class);

// 设置Map输出类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

// 设置Reduce输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

// 设置输入和输出目录

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

long t = System.currentTimeMillis();

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]+ "/" + t));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: