RDD的依赖内部解密
2016-01-30 18:36
459 查看
RDD的依赖内部解密
视频学习来源:DT-大数据梦工厂 IMF传奇行动视频(后附王家林老师联系方式)
本期内容:
RDD依赖关系的本质
依赖关系下的数据流程图
窄依赖:每个父RDD的Partition最多被子RDD的一个Partition所使用(例如map和filter、union);
宽依赖:每个父RDD中的Partition会被多个子RDD中的Partition所使用(groupByKey和ReduceByKey等作)。
总结:如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。
注:对join操作有两种情况,若果join的时候,每个Partition仅仅和已知的Partition进行join,此时的join的窄依赖并不会产生shuffle,其他情况的操作就是宽依赖。如果子RDD中的Patition对父RDD的Partiton依赖的数量
不会随着RDD数据规模的改变而改变的话,就是窄依赖,否则的话就是宽宽依赖
因为是确定的partition数量的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)
将所有RDD都看做在一个Stage,每个箭头都生成一个task,每一步都产生新的RDD,但有一个弊端就是需要挨个执行,这样会产生大量的中间数据(中间数据会存储起来,下一步才会执行,而内存不会释放),RDD内部的Partition数据不会受干扰,若将G看做最后一个RDD,final级别的RDD,有三个Partition,为每个Partition分配一个Task,此时会产生,第一个数据分片,数据来自B的一个分片,B的分片来自A的三个分片,同时来自F的四个分片,会导致Task太大,此时遇到shuffle级别的依赖关系,必须计算依赖的RDD的所有的Partition,并且都发生在一个Task中计算。
回溯和血统 pipeline。
上面两种假设的核心问题都是在遇到shuffle依赖(宽依赖)的时候无法进行pipeline,退而求其次,在有shuffle依赖的时候断开,此时窄依赖的计算链条,
1、每个stage里边的task的数量是由该stage中最后一个RDD的partition的数量所决定的。
2、从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入该Stage中
遇到shuffle的时候断开因为数据量太大
3、上图中C、D、E、F构成了pipeline,最后一个Stage里边的任务的类型是ResultTask,前面其他Stage里边的任务的类型多是shuffleMapTask。
4、代表当前Stage的算子一定是该Stage的最后一个计算步骤!!!
启动 集群进程和historyServer进程,查看历史运行任务:
DAG逻辑图:
表面上看,数据在流动,实质是算子在流动,背后核心是函数式编程,包含两层意思:
1、数据不动代码动(从集群计算的角度);
2、在一个Stage内部,算子为何会流动进行pipeline?,首先是算子合并,也就是函数式编程执行的时候最终进行函数的展开,从而把一个stage内部的多个算子合并成为一个大算子(其内部包含了当前stage所有数据的计算逻辑)。其次是由于transformation的lazy特性,所以才能最后进行合并(不加lazy会产生中间结果);在具体算子交给Executor计算之前,首先会通过SparkFrameWork(DAGScheduler)进行算子的优化(基于数据本地性的pipeline),产生pipeline不会产生中间结果,框架帮我们管理了算子,引擎将所以算子产生一个大算子(若是进行cache或者checkpoint可以添加标记进行cache和checkpoint)。
(coGroupRDD类似于二维表)
产生shuffle依赖,key不可以是数组,ReduceByKey边抓数据边计算提高效率,其参数createCombiner初始化抓进来的数据,mergeValue,V变成C的算子,mergeCombiner将相同的C加起来。
depandecy中的窄依赖:
oneToone一对一传递Partition
RangeDependency,进行join操作
shuffleDependency:
王家林老师是大数据技术集大成者,中国Spark第一人:
DT大数据梦工厂
新浪微博:www.weibo.com/ilovepains/
微信公众号:DT_Spark
博客:http://.blog.sina.com.cn/ilovepains
TEL:18610086859
Email:18610086859@vip.126.com
视频学习来源:DT-大数据梦工厂 IMF传奇行动视频(后附王家林老师联系方式)
本期内容:
RDD依赖关系的本质
依赖关系下的数据流程图
窄依赖:每个父RDD的Partition最多被子RDD的一个Partition所使用(例如map和filter、union);
宽依赖:每个父RDD中的Partition会被多个子RDD中的Partition所使用(groupByKey和ReduceByKey等作)。
总结:如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。
注:对join操作有两种情况,若果join的时候,每个Partition仅仅和已知的Partition进行join,此时的join的窄依赖并不会产生shuffle,其他情况的操作就是宽依赖。如果子RDD中的Patition对父RDD的Partiton依赖的数量
不会随着RDD数据规模的改变而改变的话,就是窄依赖,否则的话就是宽宽依赖
因为是确定的partition数量的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)
将所有RDD都看做在一个Stage,每个箭头都生成一个task,每一步都产生新的RDD,但有一个弊端就是需要挨个执行,这样会产生大量的中间数据(中间数据会存储起来,下一步才会执行,而内存不会释放),RDD内部的Partition数据不会受干扰,若将G看做最后一个RDD,final级别的RDD,有三个Partition,为每个Partition分配一个Task,此时会产生,第一个数据分片,数据来自B的一个分片,B的分片来自A的三个分片,同时来自F的四个分片,会导致Task太大,此时遇到shuffle级别的依赖关系,必须计算依赖的RDD的所有的Partition,并且都发生在一个Task中计算。
回溯和血统 pipeline。
上面两种假设的核心问题都是在遇到shuffle依赖(宽依赖)的时候无法进行pipeline,退而求其次,在有shuffle依赖的时候断开,此时窄依赖的计算链条,
1、每个stage里边的task的数量是由该stage中最后一个RDD的partition的数量所决定的。
2、从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入该Stage中
遇到shuffle的时候断开因为数据量太大
3、上图中C、D、E、F构成了pipeline,最后一个Stage里边的任务的类型是ResultTask,前面其他Stage里边的任务的类型多是shuffleMapTask。
4、代表当前Stage的算子一定是该Stage的最后一个计算步骤!!!
启动 集群进程和historyServer进程,查看历史运行任务:
DAG逻辑图:
表面上看,数据在流动,实质是算子在流动,背后核心是函数式编程,包含两层意思:
1、数据不动代码动(从集群计算的角度);
2、在一个Stage内部,算子为何会流动进行pipeline?,首先是算子合并,也就是函数式编程执行的时候最终进行函数的展开,从而把一个stage内部的多个算子合并成为一个大算子(其内部包含了当前stage所有数据的计算逻辑)。其次是由于transformation的lazy特性,所以才能最后进行合并(不加lazy会产生中间结果);在具体算子交给Executor计算之前,首先会通过SparkFrameWork(DAGScheduler)进行算子的优化(基于数据本地性的pipeline),产生pipeline不会产生中间结果,框架帮我们管理了算子,引擎将所以算子产生一个大算子(若是进行cache或者checkpoint可以添加标记进行cache和checkpoint)。
(coGroupRDD类似于二维表)
/** * Generic function to combine the elements for each key using a custom set of aggregation聚合 * functions. This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see [[combineByKeyWithClassTag]] */ def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine, serializer)(null) } /** * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. * This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see [[combineByKeyWithClassTag]] */ def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null) }
产生shuffle依赖,key不可以是数组,ReduceByKey边抓数据边计算提高效率,其参数createCombiner初始化抓进来的数据,mergeValue,V变成C的算子,mergeCombiner将相同的C加起来。
depandecy中的窄依赖:
@DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] } /** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }两种RDD的窄依赖:
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) } /** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
oneToone一对一传递Partition
RangeDependency,进行join操作
shuffleDependency:
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] {注:Hadoop中的MapReduce中的mapper和reducer相当于Spark中map和ReduceByKey算子。
王家林老师是大数据技术集大成者,中国Spark第一人:
DT大数据梦工厂
新浪微博:www.weibo.com/ilovepains/
微信公众号:DT_Spark
博客:http://.blog.sina.com.cn/ilovepains
TEL:18610086859
Email:18610086859@vip.126.com
相关文章推荐
- jpa2
- BZOJ-2324 营救皮卡丘 最小费用可行流+拆下界+Floyd预处理
- BZOJ-2324 营救皮卡丘 最小费用可行流+拆下界+Floyd预处理
- Mac下安装Scrapy
- Insert a node in a sorted linked list.
- A. Slime Combining
- Android 中Webview 自适应屏幕
- 初识RecyclerView
- linux下的I/O复用函数
- 初识RecyclerView
- Angular.js(出库增加)
- 微信支付
- windows7系统的安装
- Apache DbUtils 教程
- poj2492 A Bug's Life(并查集)
- UVA1625Color Lenth(DP+LCS变形 未AC)
- 用友NC V6.3打造集团企业高效信息平台
- 1036. 跟奥巴马一起编程(15)
- Format可能存在的坑
- 反馈通道改善系统性能_20160130