Spark编程之基本的RDD算子-aggregate和aggregateByKey
2017-07-20 14:51
615 查看
spark基本的RDD算子:
在学习spark的过程中,有这样几个算子非常重要,但是却容易混淆。在这里我想做一下记录.1) 第一个是aggregate算子.
我们首先可以看看aggregate算子的api,
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
这个算子接收三个参数,第一个是初始值,zeroValue,类型为U。
第二个接收一个函数: seqOp,这个函数接收的参数的类型为(U,T)类型,返回值为一个U类型。
第三个参数是一个函数:comOp,这个函数接收两个参数(U,U),返回值为U类型。
最后整个算子返回的类型为U类型。
这个算子将两个不同的reduce类型的函数应用到了RDD。第一个reduce类型的函数作用于每一个数据分区。
第二个reduce类型的函数将上一个不同的分区处理后的结果进行汇总。然后返回一个共同的结果.
这样极大的增加了灵活性,比如说第一个函数可以为一个max函数,求出每个分区的最大值。
第二个分区可以是一个求和函数,比如sum,这样子返回的最终结果就是每个分区最大值的求和。
这个算子有以下几点需要注意:
1)初始值需要作用于两个函数。即在第一个阶段的reduce过程需要应用到zerovalue,第二个阶段的结果也需要考虑到
zeroValue。
2) 不同的分区在执行的时候没有什么顺序。
来看一些比较基本的例子:
val z = sc.parallelize(List(1,2,3,4,5,6), 2) def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } //这个函数用于查看数据存储在了那个分区 。 z.mapPartitionsWithIndex(myfunc).collect res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6]) z.aggregate(0)(math.max(_, _), _ + _) res40: Int = 9 //结果分析.从上一个我们可以看到先求出了第一个分区的最大值为3,第二个的最大值为6.然后调用了sum函数进行累加。 //结果返回9. //如果我们将初始值设置为5的话,那么返回值的最终结果为16. //第一个分区的max函数的最大值的结果(5,1,2,3)=5 // 第2个分区的max函数的最大值的结果 max(5, 4, 5, 6) = 6 // 最后调用sum函数,其结果为 5 + 5 + 6 = 16。可以看到第二个函数调用的时候,同样应用到了初始值。 z.aggregate(5)(math.max(_, _), _ + _) res29: Int = 16 //第二个例子: val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.mapPartitionsWithIndex(myfunc).collect res31: Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f]) z.aggregate("")(_ + _, _+_) res115: String = abcdef //同样跟刚才类似,这个算子将以上的结果进行了累加,最后返回的结果是一个字符串 ///结果是abcdef z.aggregate("x")(_ + _, _+_) res116: String = xxdefxabc //当加入初始值x的时候,我们可以看到首先在第一个分区a,b,c加入了一个x,xabc。 //紧接着在第二个分区d,e,f加入了第二个xdef。最后是一个汇总函数xxdefxabc。总共用了三次 //再看几个例子: val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res141: String = 42 //注意这个时候传入的初始值为空字符串。第一个函数调用的是求每个分区的字符串长度的最大值。可以看到第一个分区的 //结果为2.然后是2.toString,所以其结果为"2",同样的"345", "4567"和"", 所以其结果为 //"4".接下来在此汇总的话相当于得到个结果"42"或者是"24".注意和顺序是没有关系的。 z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res142: String = 11 //注意当max变为min的时候,首先在第一个分区"","12"。这两个相比的时候""长度为0。然后调用 //toString,结果为“0”。然后这个 “0”又和“23”进行比较,最小的长度为1,toString之后变 //为“1”。同样的也适用于第二个分区,所以最后汇总之后结果为11.这个11 是一个字符串 val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res143: String = 10 //这个问题的分析与刚才类似。只不过第二个分区有一个空字符串,所以其返回结果是10或01. // 注意这个10也是字符串。
2) aggregateByKey算子:
首先看看api定义:
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
以上三个重载的算子共同实现了aggregateByKey算子。只不过后面的两个参数加入了分区数和Partitioner。
注意: 和刚才aggregate不同的是,aggregateByKey算子作用的是一个(key, value)类型的键值对。它只作用于key相同的情况。还有一点需要注意的是aggregateByKey的初始值不作用于第二个函数。
其他的和aggregate类似。
我们来看看基本的用法:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } pairRDD.mapPartitionsWithIndex(myfunc).collect res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]) //通过这个可以看到每个数据的不同的分区。 pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) //根据相同的key进行math.max的value值的操作。比如说在第一个分区里。 //("cat",2), ("cat", 5), ("mouse", 4).则cat的最大的value值为5。mouse的最大的值为4. //第二个分区里,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值为12,dog的最大值为12,mouse的最大值为2. //接下来进行第二个函数的操作。将第一个分区和第二个分区的key相同的值进行汇总。结果为: //(cat,17), (dog,12), (mouse,6)。即上述的结果 pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200)) //如果加入初始值,比如说是100.在第一个分区里。 //("cat",2), ("cat", 5), ("mouse", 4).则cat的最大的value值为100。mouse的最大的值为100. //第二个分区里,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值为100,dog的最大值为100,mouse的最大值为100. //接下来进行第二个函数的操作。将第一个分区和第二个分区的key相同的值进行汇总。结果为: //(cat,200), (dog,100), (mouse,200)。即上述的结果
相关文章推荐
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark编程之基本的RDD算子之cogroup,groupBy,groupByKey
- Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union
- Spark编程之基本的RDD算子count, countApproxDistinct, countByValue等
- Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey
- Spark编程之基本的RDD算子sparkContext,foreach,foreachPartition, collectAsMap
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark RDD aggregateByKey
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子之aggregate和aggregateByKey
- Spark编程之基本的RDD算子之zip,zipPartitions,zipWithIndex,zipWithUniqueId
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark编程之基本的RDD算子coalesce, repartition, checkpoint
- Spark算子[09]:aggregateByKey、aggregate详解
- spark中算子详解:aggregateByKey
- 大数据Spark “蘑菇云”行动第40课:Spark编程实战之aggregateByKey、reduceByKey、groupByKey、sortByKey深度解密