您的位置:首页 > 其它


2016-05-22 21:20 495 查看
该函数和aggregate类似,但操作的RDD是Pair类型的。Spark 1.1.0版本才正式引入该函数。官方文档定义:

Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and
return their first argument instead of creating a new U.

  aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。


def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]


  最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。


scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)))
scala> def seq(a:Int, b:Int) : Int ={
| println("seq: " + a + "\t " + b)
| math.max(a,b)
| }
seq: (a: Int, b: Int)Int

scala> def comb(a:Int, b:Int) : Int ={
| println("comb: " + a + "\t " + b)
| a + b
| }
comb: (a: Int, b: Int)Int
scala> data.aggregateByKey(1)(seq, comb).collect
seq: 1	 3
seq: 1	 2
seq: 1	 4
seq: 1	 3
comb: 3	 2
comb: 5	 4
res62: Array[(Int, Int)] = Array((1,9), (2,3))


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息