您的位置:首页 > 运维架构

Hadoop Streaming 实战: aggregate

2012-10-26 17:18 489 查看
1. aggregate概述

aggregate是Hadoop提供的一个软件包,其用来做一些通用的计算和聚合。

Generally speaking, in order to implement an application using Map/Reduce model, the developer needs to implement Map and Reduce functions (and possibly Combine function). However, for a lot of applications related to counting and statistics computing, these functions have very similarcharacteristics. This provides a package implementing those patterns. In particular,the package provides a generic mapper class,a reducer class and a combiner class, and a set of built-in value aggregators.It also provides a generic utility class, ValueAggregatorJob, that offers a static function that creates map/reduce jobs。

在Streaming中通常使用Aggregate包作为reducer来做聚合统计。

2. aggregate class summary
DoubleValueSum

This class implements a value aggregator that sums up a sequence of double values.

LongValueMax

This class implements a value aggregator that maintain the maximum of a sequence of long values.

LongValueMin

This class implements a value aggregator that maintain the minimum of a sequence of long values.

LongValueSum

This class implements a value aggregator that sums up a sequence of long values.

StringValueMax

This class implements a value aggregator that maintain the biggest of a sequence of strings.

StringValueMin

This class implements a value aggregator that maintain the smallest of a sequence of strings.

UniqValueCount

This class implements a value aggregator that dedupes a sequence of objects.

UserDefinedValueAggregatorDescriptor

This class implements a wrapper for a user defined value aggregator descriptor.

ValueAggregatorBaseDescriptor

This class implements the common functionalities of the subclasses of ValueAggregatorDescriptor class.

ValueAggregatorCombiner<K1 extends WritableComparable,V1 extends Writable>

This class implements the generic combiner of Aggregate.

ValueAggregatorJob

This is the main class for creating a map/reduce job using Aggregate framework.

ValueAggregatorJobBase<K1 extends WritableComparable,V1 extends Writable>

This abstract class implements some common functionalities of the the generic mapper, reducer and combiner classes of Aggregate.

ValueAggregatorMapper<K1 extends WritableComparable,V1 extends Writable>

This class implements the generic mapper of Aggregate.

ValueAggregatorReducer<K1 extends WritableComparable,V1 extends Writable>

This class implements the generic reducer of Aggregate.

ValueHistogram

This class implements a value aggregator that computes the histogram of a sequence of strings

3. streaming中使用aggregate

在mapper任务的输出中添加控制,如下:

function:key\tvalue

eg:

LongValueSum:key\tvalue

此外,置-reducer = aggregate。此时,Reducer使用aggregate中对应的function类对相同key的value进行操作,例如,设置function为LongValueSum则将对每个键值对应的value求和。
4. 实例1(value求和)

测试文件test.txt
a 15 1

a 17 1

a 18 1

a 19 1

a 19 1

a 19 1

a 19 1

b 20 1

c 15 1

c 15 1

d 16 1

a 16 1

mapper程序:

#include <iostream>

#include <string>

using namespace std;

int main(int argc, char** argv)

{

string a,b,c;

while(cin >> a >> b >> c)

{

cout << "LongValueSum:"<< a << "\t" << b << endl;

}

return 0;

}

运行:

$hadoop streaming -input /app/test.txt -output /app/test -mapper ./mapper -reducer aggregate -file mapper -jobconf mapred.reduce.tasks=1 -jobconf mapre.job.name="test"

输出:

a 142

b 20

c 30

d 16

5. 实例2(强大ValueHistogram)

ValueHistogram是aggregate package中最强大的类,基于每个键,对其value做以下统计

1)唯一值个数

2)最小值个数

3)中位置个数

4)最大值个数

5)平均值个数

6)标准方差

上述例子基础上修改mapper.cpp为:
#include <iostream>

#include <string>

using namespace std;

int main(int argc, char** argv)

{

string a,b,c;

while(cin >> a >> b >> c)

{

cout << "ValueHistogram:"<< a << "\t" << b << endl;

}

return 0;

}

运行命令同上

运行结果:

a 5 1 1 4 1.6 1.2

b 1 1 1 1 1.0 0.0

c 1 2 2 2 2.0 0.0

d 1 1 1 1 1.0 0.0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: