您的位置:首页 > 其它

Spark2.2-Task序列化源码解析

2017-10-24 15:26 573 查看



源码版本:2.2

如有错误请指正


一、前言

    Spark在运行应用程序时,会根据RDD的操作,将数据处理流程划分为多个stage进行计算,其中划分stage的依据是数据是否产生shuffle。在同一个stage中,RDD之间的依赖都是窄依赖。一个stage下会有多个task,每个task针对不同的分区数据执行同样的运算逻辑。

    在Spark集群模式下,多个不同的task可能运行在不同的节点上。那么,是什么机制来确保Spark集群下不同节点都能运行同样的计算逻辑呢?

    在Spark中,针对RDD的操作都是一个一个闭包,Spark在进行Job提交时会将操作作为闭包进行序列化发送到执行具体task的节点上,这样就可以达到移动计算逻辑而不是数据的目的,提高计算效率。


二、具体流程

SparkContext初始化时对DAGScheduler和TaskScheduler进行初始化,同时根据部署模式初始化对应的SchedulerBackend
任务提交时触发SparkContext中的runJob
SparkContex向DAGScheduler提交runJob
DAGScheduler向它自己的事件处理器DAGSchedulerEventProcessLoop发送JobSubmitted事件
DAGScheduler创建ResultStage,并往前回溯,遇到shuffle则划分出新的stage
DAGScheduler划分完成后提交stage,遍历回溯提交父stage,如果父stage都完成了,则提交task
DAGScheduler序列化task,并进行广播
DAGScheduler构建task集合,并向TaskScheduler提交任务集,任务集中包含了已经序列化的任务信息
TaskScheduler创建任务集管理器TaskSetManager,向SchedulerBackend申请资源
SchedulerBackend在申请到资源后,执行launchTasks方法,将TaskSet中的Task一个一个地发送到Executor去执行。


三、源码分析


3.1 SparkContext提交任务

    Spark应用程序中对Dataset执行一系列操作



    点击Dataset中的一个具体操作,例如collect函数中,可以发现Dataset在执行具体的transformation或是action操作时,实际上已经生成了一个执行计划。

    具体执行计划是如何生成的,涉及到Spark SQL中的实现细节,这里不做详细描述,之后单独对Spark SQL原理进行解读。



    在执行执行计划中的具体操作时,都会掉用SparkContext中的runJob方法,具体是如何调用的,在之后的Spark SQL原理解读中再进行详细描述。

    在调用runJob方法时,可以看到,当前的执行函数作为一个参数传给了SparkContext的runJob方法。



    SparkContext在获取到func时,首先会对它做一个闭包清理。在ClosureCleaner类中的clean方法验证闭包是否可以被序列化,是否可以直接对它进行转换清理。
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {

if (!isClosure(func.getClass)) {
logWarning("Expected a closure; got " + func.getClass.getName)
return
}

// TODO: clean all inner closures first. This requires us to find the inner objects.
// TODO: cache outerClasses / innerClasses / accessedFields

if (func == null) {
return
}

logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")

// A list of classes that represents closures enclosed in the given one
val innerClasses = getInnerClosureClasses(func)

// A list of enclosing objects and their respective classes, from innermost to outermost
// An outer object at a given index is of type outer class at the same index
val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)

// For logging purposes only
val declaredFields = func.getClass.getDeclaredFields
val declaredMethods = func.getClass.getDeclaredMethods

logDebug(" + declared fields: " + declaredFields.size)
declaredFields.foreach { f => logDebug("     " + f) }
logDebug(" + declared methods: " + declaredMethods.size)
declaredMethods.foreach { m => logDebug("     " + m) }
logDebug(" + inner classes: " + innerClasses.size)
innerClasses.foreach { c => logDebug("     " + c.getName) }
logDebug(" + outer classes: " + outerClasses.size)
outerClasses.foreach { c => logDebug("     " + c.getName) }
logDebug(" + outer objects: " + outerObjects.size)
outerObjects.foreach { o => logDebug("     " + o) }

// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)

// If accessed fields is not populated yet, we assume that
// the closure we are trying to clean is the starting one
if (accessedFields.isEmpty) {
logDebug(s" + populating accessed fields because this is the starting closure")
// Initialize accessed fields with the outer classes first
// This step is needed to associate the fields to the correct classes later
for (cls <- outerClasses) {
accessedFields(cls) = Set[String]()
}
// Populate accessed fields by visiting all fields and methods accessed by this and
// all of its inner closures. If transitive cleaning is enabled, this may recursively
// visits methods that
f7d5
belong to other classes in search of transitively referenced fields.
for (cls <- func.getClass :: innerClasses) {
getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0)
}
}

logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
accessedFields.foreach { f => logDebug("     " + f) }

// List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
var parent: AnyRef = null
if (outerPairs.size > 0) {
val (outermostClass, outermostObject) = outerPairs.head
if (isClosure(outermostClass)) {
logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
} else if (outermostClass.getName.startsWith("$line")) {
// SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
// as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.
logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
} else {
// The closure is ultimately nested inside a class; keep the object of that
// class without cloning it since we don't want to clone the user's objects.
// Note that we still need to keep around the outermost object itself because
// we need it to clone its child closure later (see below).
logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
outerPairs.head)
parent = outermostObject // e.g. SparkContext
outerPairs = outerPairs.tail
}
} else {
logDebug(" + there are no enclosing objects!")
}

// Clone the closure objects themselves, nulling out any fields that are not
// used in the closure we're working on or any of its inner closures.
for ((cls, obj) <- outerPairs) {
logDebug(s" + cloning the object $obj of class ${cls.getName}")
// We null out these unused references by cloning each object and then filling in all
// required fields from the original object. We need the parent here because the Java
// language specification requires the first constructor parameter of any closure to be
// its enclosing object.
val clone = instantiateClass(cls, parent)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
field.set(clone, value)
}
// If transitive cleaning is enabled, we recursively clean any enclosing closure using
// the already populated accessed fields map of the starting closure
if (cleanTransitively && isClosure(clone.getClass)) {
logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})")
// No need to check serializable here for the outer closures because we're
// only interested in the serializability of the starting closure
clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
}
parent = clone
}

// Update the parent pointer ($outer) of this closure
if (parent != null) {
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
// If the starting closure doesn't actually need our enclosing object, then just null it out
if (accessedFields.contains(func.getClass) &&
!accessedFields(func.getClass).contains("$outer")) {
logDebug(s" + the starting closure doesn't actually need $parent, so we null it out")
field.set(func, null)
} else {
// Update this closure's parent pointer to point to our enclosing object,
// which could either be a cloned closure or the original user object
field.set(func, parent)
}
}

logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++")

if (checkSerializable) {
ensureSerializable(func)
}
}


3.2 DAGScheduler序列化并提交任务

    SparkContext在runJob方法中调用了它所拥有的DAGScheduler的runJob方法来运行Job。





    DAGScheduler在它的runJob方法中,通过submitJob方法提交任务,获取一个一直阻塞等待Job执行完毕的对象JobWaiter。



    在submitJob方法中,DAGScheduler首先会对func的类型进行处理,(为什么会做这样的类型转换???)向它的事件处理器发送JobSubmitted



    在DAGScheduler实际对JobSubmitted事件进行处理时,func函数的类型已经从(TaskContext, Iterator[_]) => U 转换成了(TaskContext, Iterator[_]) => _

    在经过一系列的stage划分等操作,最终提交任务是在DAGScheduler中的submitMissingTasks方法进行处理,我们可以看到抽象类Stage有两个具体实现类ShuffleMapStage和ResultStage,其中只有ResultStage中才有具体的执行操作func。 
ShuffleMapStage是在DAG中为shuffle生成数据的中间stage。它们在每一个shffle操作前发生,并且可能包含多个pipelined操作(eg map和filter)。在执行时,ShuffleMapStage会将后面reduce task将会使用到的数据保存为输出文件。'ShfuuleDep'字段描述了每个阶段的shuffle,'outputLocs'和'numAvailableOutputs'变量跟踪了map输出的就绪情况。
ResultStage将一个函数运行在RDD的某些分区,来计算一个action操作的结果。ResultStage对象捕获要执行的函数,'func',它将会在每一个分区上运行,'partitions'变量存放的是分区的ID集合。一些stage可能不会在RDD的所有分区上运行,例如first和lookup action





    由于只有ResultStage中才会包含func信息,DAGScheduler在submitMissingTasks方法中提交task时会区分stage类型类进行序列化。提交task之前首先会进行一些状态更新和获取数据Loc的操作。

    更新状态后,DAGScheduler会根据stage的类型来讲stage序列化为二进制的task。针对ShuffleMapStage,会将它的rdd和shuffleDep进行序列化。针对ResultStage,如前面所说,ResultStage对象捕获要执行的函数,'func',它将会在每一个分区上运行,序列化时会将rdd和func进行序列化。注意,虽然Spark有多种序列化的实现,但在序列化任务信息时,只会采用JavaSerializer。即DAGScheduler中的closureSerializer的固定默认实现是JavaSerializerInstance。



    我们再跳转到JavaSerializerInstance的serialize和deserialize方法中可以发现,Spark的JavaSerializerInstance在对闭包进行序列化时,并没有涉及到闭包的独有信息。闭包在进入序列化之前需要先做自己的校验和清理工作,这部分代码是在前面讲的SparkContext中调用ClosureCleaner的clean方法实现的,对闭包的innerClass、declaredField和declaredMethod等方法进行了校验(具体的校验逻辑和原理暂时还未理清)。

    Spark的JavaSerializer中的序列化和反序列化方法中基于java.io的ByteArrayOutputStream和InputStream封装了自己的字节输入输出流ByteBufferOutputStream和ByteBufferInputStream。序列化完成后的结果是一个Byte数组Array[Byte],DAGScheduler会委托SparkContext将这些字节码广播给每一个工作节点。当遇到闭包中存在无法序列化的对象和引用时,会直接触发stage的失败。

    完成RDD和func/ shuffleDep的序列化之后,DAGScheduler会根据分区的id、数据本地性结合上一步序列化后的二进制码结果,构造出一系列的Task,这里的Task的运行逻辑一致,但分区id和数据本地性信息是根据要计算的数据的分区信息来进行包装的。



    TaskSet构造完成后,DAGScheduler向TaskScheduler提交任务



 


3.3 TaskScheduler提交具体任务到Executor 

    TaskScheduler的submitTasks后会构造任务集管理器TaskSetManager来跟踪任务的运行状况,并向SchedulerBackend申请资源。



    SchedulerBackend在申请到资源后,会调用Executor中的launchTask来执行具体任务,在执行具体任务是,构造了一个TaskRunner的Runnable对象,TaskRunner对象中存放了具体的Task信息:
TaskDescription,里面包括了Task的描述信息、依赖的jar文件和序列化的task信息。





3.4 Exectutor反序列化任务并执行

  Task的具体反序列化过程在Executor中的TaskRunner中的run方法中,即任务实际执行时进行反序列化,我们可以重点看一下TaskRunner中的run方法:

    TaskRunner中的反序列化方法是通过获取env中的闭包反序列化实例获得的,这里默认只能使用JavaSerializerInstance进行反序列化,但注意,这里的反序列化之后真正执行的函数还是binary数据。

    第一步updateDependencies下载SparkContext广播的JAR和文件,添加jar到classpath中

    第二步 操作中对task进行真正的反序列化,使用更新后的当前线程类加载器
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)




    反序列化后真正执行任务是通过调用抽象类Task的run方法来完成的



    抽象类Task的run方法是一个final方法,抽象类有两种不同的实现ShuffleMapTask和ResultTask。抽象类Task中的run方法再调用实际Task实现类的runTask方法进行。在这里才会将真正的函数binary数据反序列化为真正的RDD和func。

    反序列化出func后,实际运行func,完成数据计算工作。




四、问题

    通过上面的源码分析,可以理解Task序列化的整体流程和大概的序列化细节。但具体针对闭包的序列化还是有一些需要仔细研究的点
闭包的序列化清理原理
闭包的JAR依赖
ClassLoader隔离机制

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark Serializer scala