第148讲:Spark RDD中Transformation的combineByKey、reduceByKey详解
2017-05-06 08:54
453 查看
第148讲:Spark RDD中Transformation的combineByKey、reduceByKey详解
我们看一下PairRDDFunctions.scala的reduceByKey:类似于Hadoop中combiner,reduceByKey在每一个mapper进行本地合并,合并以后才把结果发送给reduce。他调用的其实就是combineByKey。
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
combineByKeyWithClassTag的源代码如下:combineByKeyWithClassTag是实验性的,使用自定义的聚合函数对每个Key的值进行组合。
对于组合类型C,可以将RDD[(K, V)]转换为 RDD[(K, C)]。
combineByKeyWithClassTag 提供了3个函数:
- `createCombiner`, 将V 转换为 C (例如,创建一个元素列表)
- `mergeValue`, 合并 V到 C里面 (例如,增加一个 it到列表的末尾)
- `mergeCombiners`, 合并2个C 类型到一个元素.
此外,我们可以控制输出RDD的分区,及是否执行map-side aggregation聚合,(如:端mapper 相同的KEY值进行叠加计算)。
注意:V 和 V的类型可以不一样的,例如,RDD的类型从(Int, Int) 转换为 (Int, Seq[Int])
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
我们看一下combineByKey的示意图
reduceByKey中:返回RDD结果的值的类型仍是V类型
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
combineByKeyWithClassTag 定义了泛型C:返回RDD结果的值的类型变成了C类型
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
网上的一个例子 ,使用combineByKey来求解平均数的例子。
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val d1 = sc.parallelize(initialScores)
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
d1.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect
参数含义的解释
a 、score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0) 1表示当前科目的计数器,此时只有一个科目
b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScore,然后把科目计算器加1即c1._1 + 1
c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数
执行结果
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
我们看一下PairRDDFunctions.scala的reduceByKey:类似于Hadoop中combiner,reduceByKey在每一个mapper进行本地合并,合并以后才把结果发送给reduce。他调用的其实就是combineByKey。
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
combineByKeyWithClassTag的源代码如下:combineByKeyWithClassTag是实验性的,使用自定义的聚合函数对每个Key的值进行组合。
对于组合类型C,可以将RDD[(K, V)]转换为 RDD[(K, C)]。
combineByKeyWithClassTag 提供了3个函数:
- `createCombiner`, 将V 转换为 C (例如,创建一个元素列表)
- `mergeValue`, 合并 V到 C里面 (例如,增加一个 it到列表的末尾)
- `mergeCombiners`, 合并2个C 类型到一个元素.
此外,我们可以控制输出RDD的分区,及是否执行map-side aggregation聚合,(如:端mapper 相同的KEY值进行叠加计算)。
注意:V 和 V的类型可以不一样的,例如,RDD的类型从(Int, Int) 转换为 (Int, Seq[Int])
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
我们看一下combineByKey的示意图
reduceByKey中:返回RDD结果的值的类型仍是V类型
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
combineByKeyWithClassTag 定义了泛型C:返回RDD结果的值的类型变成了C类型
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
网上的一个例子 ,使用combineByKey来求解平均数的例子。
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val d1 = sc.parallelize(initialScores)
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
d1.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect
参数含义的解释
a 、score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0) 1表示当前科目的计数器,此时只有一个科目
b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScore,然后把科目计算器加1即c1._1 + 1
c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数
执行结果
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
相关文章推荐
- Spark RDD中Transformation的combineByKey、reduceByKey,join详解
- Spark核心RDD:combineByKey函数详解
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark RDD——combineByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark2.1:rdd.combineByKeyWithClassTag的用法示例
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark RDD操作:combineByKey函数详解
- 【Spark Java API】Transformation(10)—combineByKey、groupByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- [Spark--PairRDDFunctions]--combineByKey的解释
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- RDD : combineByKey