您的位置:首页 > 其它

reduce 与 reduceByKey 区别

2016-12-13 13:54 197 查看
reduce 是一种聚合函数,可以把各个任务的执行结果汇集到一个节点,还可以指定自定义的函数传入 reduce 执行。==reduce 把同一个任务内的结果现在本地 Worker 节点进行聚合,再把结果传给 Driver 执行聚合。但最终数据还是要汇总到主节点,而且 reduce 会把接受到的数据保存在内存中,直到所有任务都完成为止==。因此,当任务很多,任务的结果数据有比较大时,Driver 容易造成性能瓶颈,这样就应该考虑尽量避免 reduce 的使用,而将数据转换为 key-value 对,并使用 reduceByKey 实现逻辑,是计算变为分布式计算。

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
} else {
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

reduceByKey 在每个 mapper 把数据发送给 reducer 前,会在 Map 端本地先合并(类似与 MapReduce
中的 Combiner)。与 reduce 不同的是,reduceByKey 不会把数据汇集到 Driver 节点,是分布式进行的,因此不会存在 reduce 那样的性能瓶颈。

* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息