您的位置:首页 > 其它

RDD的依赖内部解密

2016-01-30 18:36 459 查看
RDD的依赖内部解密


视频学习来源:DT-大数据梦工厂 IMF传奇行动视频(后附王家林老师联系方式)

本期内容:

RDD依赖关系的本质

依赖关系下的数据流程图

窄依赖:每个父RDD的Partition最多被子RDD的一个Partition所使用(例如map和filter、union);

宽依赖:每个父RDD中的Partition会被多个子RDD中的Partition所使用(groupByKey和ReduceByKey等作)。

总结:如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。

注:对join操作有两种情况,若果join的时候,每个Partition仅仅和已知的Partition进行join,此时的join的窄依赖并不会产生shuffle,其他情况的操作就是宽依赖。如果子RDD中的Patition对父RDD的Partiton依赖的数量
不会随着RDD数据规模的改变而改变的话,就是窄依赖,否则的话就是宽宽依赖

因为是确定的partition数量的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)



将所有RDD都看做在一个Stage,每个箭头都生成一个task,每一步都产生新的RDD,但有一个弊端就是需要挨个执行,这样会产生大量的中间数据(中间数据会存储起来,下一步才会执行,而内存不会释放),RDD内部的Partition数据不会受干扰,若将G看做最后一个RDD,final级别的RDD,有三个Partition,为每个Partition分配一个Task,此时会产生,第一个数据分片,数据来自B的一个分片,B的分片来自A的三个分片,同时来自F的四个分片,会导致Task太大,此时遇到shuffle级别的依赖关系,必须计算依赖的RDD的所有的Partition,并且都发生在一个Task中计算。

回溯和血统 pipeline。

上面两种假设的核心问题都是在遇到shuffle依赖(宽依赖)的时候无法进行pipeline,退而求其次,在有shuffle依赖的时候断开,此时窄依赖的计算链条,

1、每个stage里边的task的数量是由该stage中最后一个RDD的partition的数量所决定的。

2、从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入该Stage中

遇到shuffle的时候断开因为数据量太大

3、上图中C、D、E、F构成了pipeline,最后一个Stage里边的任务的类型是ResultTask,前面其他Stage里边的任务的类型多是shuffleMapTask。

4、代表当前Stage的算子一定是该Stage的最后一个计算步骤!!!

启动 集群进程和historyServer进程,查看历史运行任务:



DAG逻辑图:



表面上看,数据在流动,实质是算子在流动,背后核心是函数式编程,包含两层意思:

1、数据不动代码动(从集群计算的角度);

2、在一个Stage内部,算子为何会流动进行pipeline?,首先是算子合并,也就是函数式编程执行的时候最终进行函数的展开,从而把一个stage内部的多个算子合并成为一个大算子(其内部包含了当前stage所有数据的计算逻辑)。其次是由于transformation的lazy特性,所以才能最后进行合并(不加lazy会产生中间结果);在具体算子交给Executor计算之前,首先会通过SparkFrameWork(DAGScheduler)进行算子的优化(基于数据本地性的pipeline),产生pipeline不会产生中间结果,框架帮我们管理了算子,引擎将所以算子产生一个大算子(若是进行cache或者checkpoint可以添加标记进行cache和checkpoint)。

(coGroupRDD类似于二维表)
/**
* Generic function to combine the elements for each key using a custom set of aggregation聚合
* functions. This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}


产生shuffle依赖,key不可以是数组,ReduceByKey边抓数据边计算提高效率,其参数createCombiner初始化抓进来的数据,mergeValue,V变成C的算子,mergeCombiner将相同的C加起来。

depandecy中的窄依赖:
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}

/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
两种RDD的窄依赖:

/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}


oneToone一对一传递Partition

RangeDependency,进行join操作

shuffleDependency:
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
注:Hadoop中的MapReduce中的mapper和reducer相当于Spark中map和ReduceByKey算子。

王家林老师是大数据技术集大成者,中国Spark第一人:

DT大数据梦工厂

新浪微博:www.weibo.com/ilovepains/

微信公众号:DT_Spark

博客:http://.blog.sina.com.cn/ilovepains

TEL:18610086859

Email:18610086859@vip.126.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: