您的位置:首页 > 其它

stage划分和task最佳位置算法

2017-01-19 14:00 423 查看
一:Stage划分算法解密

1, Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;

2, Stage划分的已经就是宽依赖,什么产生宽依赖呢?例如reducByKey、groupByKey等等;

3, 由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case
class JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:

private[scheduler]
case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext,
Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties =
null)
  extends
DAGSchedulerEvent
 

eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive
 

4, 在doOnReceive中通过模式匹配的方式把执行路由到

 

case
JobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties) =>
  dagScheduler.handleJobSubmitted(jobId,
rdd,
func,
partitions,
callSite,
listener,
properties)
 

5, 在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条;

 

补充说明:所谓的missing就是说要进行当前的计算了。

二:Task任务本地性算法实现;

1, 在submitMissingTasks中会通过调用以下代码来获得任务的本地性:

val
taskIdToLocations: Map[Int,
Seq[TaskLocation]] =
try {
  stage match
{
    case
s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id,
getPreferredLocs(stage.rdd,
id))}.toMap
    case
s: ResultStage =>
      val
job = s.activeJob.get
      partitionsToCompute.map { id =>
        val
p = s.partitions(id)
        (id,
getPreferredLocs(stage.rdd,
p))
      }.toMap
  }
}
 

2, 具体一个Partition中的数据本地性的算法实现为下述代码中:

private[spark]
def getPreferredLocs(rdd: RDD[_],
partition: Int):
Seq[TaskLocation] = {
  getPreferredLocsInternal(rdd,
partition,
new HashSet)
}
 

在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations

例如想让Spark运行在HBase上或者一种现在还没有直接直接的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations

3,DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化;

 

 

 

 

 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark