stage划分和task最佳位置算法
2017-01-19 14:00
423 查看
一:Stage划分算法解密
1, Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
2, Stage划分的已经就是宽依赖,什么产生宽依赖呢?例如reducByKey、groupByKey等等;
3, 由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case
class JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:
private[scheduler]
case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext,
Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties =
null)
extends
DAGSchedulerEvent
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive
4, 在doOnReceive中通过模式匹配的方式把执行路由到
case
JobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties) =>
dagScheduler.handleJobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties)
5, 在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条;
补充说明:所谓的missing就是说要进行当前的计算了。
二:Task任务本地性算法实现;
1, 在submitMissingTasks中会通过调用以下代码来获得任务的本地性:
val
taskIdToLocations: Map[Int,
Seq[TaskLocation]] =
try {
stage match
{
case
s: ShuffleMapStage =>
partitionsToCompute.map { id => (id,
getPreferredLocs(stage.rdd,
id))}.toMap
case
s: ResultStage =>
val
job = s.activeJob.get
partitionsToCompute.map { id =>
val
p = s.partitions(id)
(id,
getPreferredLocs(stage.rdd,
p))
}.toMap
}
}
2, 具体一个Partition中的数据本地性的算法实现为下述代码中:
private[spark]
def getPreferredLocs(rdd: RDD[_],
partition: Int):
Seq[TaskLocation] = {
getPreferredLocsInternal(rdd,
partition,
new HashSet)
}
在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations
例如想让Spark运行在HBase上或者一种现在还没有直接直接的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations
3,DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化;
1, Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
2, Stage划分的已经就是宽依赖,什么产生宽依赖呢?例如reducByKey、groupByKey等等;
3, 由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case
class JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:
private[scheduler]
case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext,
Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties =
null)
extends
DAGSchedulerEvent
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive
4, 在doOnReceive中通过模式匹配的方式把执行路由到
case
JobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties) =>
dagScheduler.handleJobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties)
5, 在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条;
补充说明:所谓的missing就是说要进行当前的计算了。
二:Task任务本地性算法实现;
1, 在submitMissingTasks中会通过调用以下代码来获得任务的本地性:
val
taskIdToLocations: Map[Int,
Seq[TaskLocation]] =
try {
stage match
{
case
s: ShuffleMapStage =>
partitionsToCompute.map { id => (id,
getPreferredLocs(stage.rdd,
id))}.toMap
case
s: ResultStage =>
val
job = s.activeJob.get
partitionsToCompute.map { id =>
val
p = s.partitions(id)
(id,
getPreferredLocs(stage.rdd,
p))
}.toMap
}
}
2, 具体一个Partition中的数据本地性的算法实现为下述代码中:
private[spark]
def getPreferredLocs(rdd: RDD[_],
partition: Int):
Seq[TaskLocation] = {
getPreferredLocsInternal(rdd,
partition,
new HashSet)
}
在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations
例如想让Spark运行在HBase上或者一种现在还没有直接直接的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations
3,DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化;
相关文章推荐
- Spark的Stage划分和task最佳位置算法
- [Spark内核] 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 第34课: Stage划分和Task最佳位置算法源码彻底解密
- Stage划分和Task最佳位置算法解密
- 大数据IMF传奇行动绝密课程第34课:Stage划分和Task最佳位置算法解密
- 7.DAGScheduler的stage算法划分和TaskScheduler的task算法划分
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]
- stage划分算法
- spark stage的划分和task分配
- Spark学习笔记--stage和task的划分
- spark学习-DAGScheduler的stage划分算法
- Spark作业的Stage划分,Task创建分发一直到提交给Spark的Executor的线程池执行全过程
- Spark的job触发流程原理与stage划分算法分析
- spark源码学习(五):stage的划分和task的创建
- Spark stage阶段划分算法
- Spark中job、stage、task的划分+源码执行过程分析
- Spark的stage & job & task 到底是什么 ,以及划分原理
- Spark的stage & job & task 到底是什么 ,以及划分原理