您的位置:首页 > 其它

reduce 与 reduceByKey 区别

2016-12-13 13:54 197 查看
reduce 是一种聚合函数,可以把各个任务的执行结果汇集到一个节点,还可以指定自定义的函数传入 reduce 执行。==reduce 把同一个任务内的结果现在本地 Worker 节点进行聚合,再把结果传给 Driver 执行聚合。但最终数据还是要汇总到主节点,而且 reduce 会把接受到的数据保存在内存中,直到所有任务都完成为止==。因此,当任务很多,任务的结果数据有比较大时,Driver 容易造成性能瓶颈,这样就应该考虑尽量避免 reduce 的使用,而将数据转换为 key-value 对,并使用 reduceByKey 实现逻辑,是计算变为分布式计算。

/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}


reduceByKey 在每个 mapper 把数据发送给 reducer 前,会在 Map 端本地先合并(类似与 MapReduce
4000
中的 Combiner)。与 reduce 不同的是,reduceByKey 不会把数据汇集到 Driver 节点,是分布式进行的,因此不会存在 reduce 那样的性能瓶颈。

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: