您的位置:首页 > 其它

pyspark aggregate函数使用问题(aggregate重写,aggregate中的函数参数限制)

2017-11-07 14:32 411 查看
今天在学习spark python 编程中使用aggregate 出现几个需要注意的问题,在这里分享一下!!!

需求目标:

将一个int 类型RDD 中各个分区中的数据中最大数字拿出来,拼接成一个字符串

实现代码:

l1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(l1,2)
rdd1.aggregate(0,lambda a,b: str(max(a,b)),lambda a,b:a+b)


在spark 环境中上面代码报错:TypeError: unorderable types: int() > str()

从百度得知这个错误是,类型错误,应该传入是int 类型,确传入了str ,一开始的时候一脸懵逼。。。

分析aggregate 源码: def aggregate(self, zeroValue, seqOp, combOp):

"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given combine functions and a neutral "zero
value."

The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.

The first function (seqOp) can return a different result type, U, than
the type of this RDD. Thus, we need one operation for merging a T into
an U and one operation for merging two U

>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = seqOp(acc, obj)
yield acc
# collecting result of mapPartitions here ensures that the copy of
# zeroValue provided to each partition is unique from the one provided
# to the final reduce call
vals = self.mapPartitions(func).collect()
return reduce(combOp, v
bc12
als, zeroValue)


追踪:Traceback 到:acc = seqOp(acc, obj)

seqOp 需要传入两个int 类型数据 ,并将返回值做为下一个传参数,在我的代码中 seqOp = lambda a,b: str(max(a,b)) 返回值为str ,豁然开朗!!1

修改:

l1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(l1,2)
rdd1.aggregate(0,max,lambda a,b:str(a)+str(b))


这时又出现了一个问题:

上述代码返回值是的:049 或 094

分析aggregate 的执行过程:

1. reduce of partition 0 will be max(0, 1, 2, 3,4) = 4

2. reduce of partition 1 will be max(0, 5, 6,7,8,9) = 9

3. final reduce across partitions will be ‘0’ + ‘4’ + ‘9’ = ‘094’

问题出在这个初始值,这个初始值不论说什么都会报错(试过:None, ‘’)

都不能得到正确的值‘49’

所以只能重写aggregate 方法:

from functools import reduce
class MyRDD(RDD):
def __init__(self):
RDD.__init__(self)
def aggregate(self, seqOp, combOp):
def func(iterator):
acc = 0
for obj in iterator:
acc = seqOp(acc, obj)
yield acc

vals = self.mapPartitions(func).collect()
return reduce(combOp, vals)

rdd1.__class__ = MyRDD ##将父类实例转化为子类实例的
rdd1.aggregate(max,lambda a,b:str(a)+str(b)) ##结果25


结果:‘49’

终于得到了想要的结果!!!!!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: