深入理解groupByKey、reduceByKey
2018-01-16 11:32
447 查看
下面来看看groupByKey和reduceByKey的区别:
借助下图可以理解在reduceByKey里发生了什么。 在数据对被搬移前,同一机器上同样的
image另一方面,当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey。
为了确定将数据对移到哪个主机,Spark会对数据对的
image你可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。
我们来看看两个函数的实现:
他们都调用了
combineByKey是将RDD[(K,V)]combine为RDD[(K,C)],因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果V和C类型一致,则函数为V => V。倘若C是一个集合,例如Iterable[V],则createCombiner为V => Iterable[V]。
mergeValue则是将原RDD中Pair的Value合并为操作后的C类型数据。合并操作的实现决定了结果的运算方式。所以,mergeValue更像是声明了一种合并方式,它是由整个combine运算的结果来导向的。函数的输入为原RDD中Pair的V,输出为结果RDD中Pair的C。
最后的mergeCombiners则会根据每个Key所对应的多个C,进行归并。
例如:
有许多函数比goupByKey好:
当你combine元素时,可以使用
作者:jacksu在简书
链接:https://www.jianshu.com/p/0c6705724cff
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local") val sc = new SparkContext(conf) val words = Array("one", "two", "two", "three", "three", "three") val wordsRDD = sc.parallelize(words).map(word => (word, 1)) val wordsCountWithReduce = wordsRDD. reduceByKey(_ + _). collect(). foreach(println) val wordsCountWithGroup = wordsRDD. groupByKey(). map(w => (w._1, w._2.sum)). collect(). foreach(println)虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的
key结合。
借助下图可以理解在reduceByKey里发生了什么。 在数据对被搬移前,同一机器上同样的
key是怎样被组合的( reduceByKey中的 lamdba 函数)。然后 lamdba 函数在每个分区上被再次调用来将所有值 reduce成最终结果。整个过程如下:
image另一方面,当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey。
为了确定将数据对移到哪个主机,Spark会对数据对的
key调用一个分区算法。 当移动的数据量大于单台执行机器内存总量时
Spark会把数据保存到磁盘上。 不过在保存时每次会处理一个
key的数据,所以当单个 key 的键值对超过内存容量会存在内存溢出的异常。 这将会在之后发行的 Spark 版本中更加优雅地处理,这样的工作还可以继续完善。 尽管如此,仍应避免将数据保存到磁盘上,这会严重影响性能。
image你可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。
我们来看看两个函数的实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
/** * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }注意
mapSideCombine=false,partitioner是
HashPartitioner,但是groupByKey对小数据量比较好,一个key对应的个数少于10个。
他们都调用了
combineByKeyWithClassTag,我们再来看看
combineByKeyWithClassTag的定义:
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
combineByKey是将RDD[(K,V)]combine为RDD[(K,C)],因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果V和C类型一致,则函数为V => V。倘若C是一个集合,例如Iterable[V],则createCombiner为V => Iterable[V]。
mergeValue则是将原RDD中Pair的Value合并为操作后的C类型数据。合并操作的实现决定了结果的运算方式。所以,mergeValue更像是声明了一种合并方式,它是由整个combine运算的结果来导向的。函数的输入为原RDD中Pair的V,输出为结果RDD中Pair的C。
最后的mergeCombiners则会根据每个Key所对应的多个C,进行归并。
例如:
var rdd1 = sc.makeRDD(Array(("A", 1), ("A", 2), ("B", 1), ("B", 2),("B",3),("B",4), ("C", 1))) rdd1.combineByKey( (v: Int) => v + "_", (c: String, v: Int) => c + "@" + v, (c1: String, c2: String) => c1 + "$" + c2 ).collect.foreach(println)result不确定欧,单机执行不会调用mergeCombiners:
(B,1_@2@3@4) (A,1_@2) (C,1_)在集群情况下:
(B,2_@3@4$1_) (A,1_@2) (C,1_) 或者 (B,1_$2_@3@4) (A,1_@2) (C,1_)
mapSideCombine=false时,再体验一下运行结果。
有许多函数比goupByKey好:
当你combine元素时,可以使用
combineByKey,但是输入值类型和输出可能不一样
foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。
//使用combineByKey计算wordcount wordsRDD.map(word=>(word,1)).combineByKey( (v: Int) => v, (c: Int, v: Int) => c+v, (c1: Int, c2: Int) => c1 + c2 ).collect.foreach(println) //使用foldByKey计算wordcount println("=======foldByKey=========") wordsRDD.map(word=>(word,1)).foldByKey(0)(_+_).foreach(println) //使用aggregateByKey计算wordcount println("=======aggregateByKey============") wordsRDD.map(word=>(word,1)).aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)
foldByKey,
aggregateByKey都是由combineByKey实现,并且
mapSideCombine=true,因此可以使用这些函数替代goupByKey。
作者:jacksu在简书
链接:https://www.jianshu.com/p/0c6705724cff
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关文章推荐
- 深入理解groupByKey、reduceByKey
- 深入理解groupByKey、reduceByKey
- 深入理解groupByKey、reduceByKey
- 深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
- 深入理解Spark算子之 reduceByKey
- 对于Spark中groupByKey的深入理解
- 【Spark】Spark-reduceByKey-深入理解
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- [Apache Spark API][GroupByKey Vs ReduceByKey]
- spark RDD,reduceByKey vs groupByKey
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、lookup(一)
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- 大数据IMF传奇行动绝密课程第17课:RDD案例(join、cogroup、reduceByKey、groupByKey等)
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- spark python初学(一)对于reduceByKey的理解
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark API编程动手实战-04-以在Spark 1.2版本实现对union、groupByKey、join、reduce、lookup等操作实践