Spark函数讲解:aggregateByKey
2016-05-22 21:20
495 查看
该函数和aggregate类似,但操作的RDD是Pair类型的。Spark 1.1.0版本才正式引入该函数。官方文档定义:
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and
return their first argument instead of creating a new U.
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。
最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and
return their first argument instead of creating a new U.
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
函数原型
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。
第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。
最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。
实例
scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3))) scala> def seq(a:Int, b:Int) : Int ={ | println("seq: " + a + "\t " + b) | math.max(a,b) | } seq: (a: Int, b: Int)Int scala> def comb(a:Int, b:Int) : Int ={ | println("comb: " + a + "\t " + b) | a + b | } comb: (a: Int, b: Int)Int scala> data.aggregateByKey(1)(seq, comb).collect seq: 1 3 seq: 1 2 seq: 1 4 seq: 1 3 comb: 3 2 comb: 5 4 res62: Array[(Int, Int)] = Array((1,9), (2,3))
注意
细心的读者肯定发现了aggregateByKey和aggregate结果有点不一样。如果用aggregate函数对含有3、2、4三个元素的RDD进行计算,初始值为1的时候,计算的结果应该是10,而这里是9,这是因为aggregate函数中的初始值需要和reduce函数以及combine函数结合计算,而aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。相关文章推荐
- HDU 2066 一个人的旅行
- 模板收集
- Hadoop中reduce端shuffle过程及源码解析
- const与#define的区别
- 欢迎使用CSDN-markdown编辑器
- 正则表达式常用方法
- nginx rewrite uri地址重写
- PS 的小应用
- NVIDIA详细解读游戏中DX9与DX11差别
- APP安全之APK完整性校验
- Oracle数据库—— 游标的创建和应用
- android:theme 与 setTheme()设置透明效果并不同
- Android View源码解读:浅谈DecorView与ViewRootImpl
- uBuntu make xconfig Linux内核配置 问题
- Spark函数讲解:cache
- construct-binary-tree-from-preorder-and-inorder-traversal
- 【软考总结】——正视自己的不足
- LeetCode-77.Combinations
- IPSEC与SSL/TLS的比较
- threejs(一) 点,线,网格