Spark源码分析之Executor分析
2017-11-10 21:02
281 查看
Executor是在worker启动的一个进程,用于执行task任务。我们知道CoarseGrainedSchedulerBackend在启动之后,会创建Driver终端,然后会立即向Driver发送RegisterExecutor消息,注册成功之后,会向CoarseGrainedSchedulerBackend返回一个RegisteredExecutor消息
然后会创建一个Executor对象;然后接下来Driver内的DAGScheduler
向CoarseGrainedSchedulerBackend发送LaunchTask消息,然后这个SchedulerBackend实际上调用executor来发起任务,创建TaskRunner来执行task
一 核心属性
String executorHostname: executor对应的hostname
Boolean isLocal:是否是本地的
ThreadPoolExecutor threadPool:线程池
boolean userClassPathFirst: 是否首先加载用户jar中的class
MutableURLClassLoader urlClassLoader: URL类加载器
Long maxDirectResultSize: 直接结果最大为多大,首先从spark.task.maxDirectResultSize获取,如果没有设置则,默认是1M,还需要和rpc的message所允许的最大字节数比较,看谁比较小
Long maxResultSize:最大结果限制,默认是1GB
ConcurrentHashMap runningTasks: 正在运行的task列表
ScheduledExecutorService heartbeater: 心跳线程
HEARTBEAT_MAX_FAILURES :心跳检测失败默认值60
二 重要方法
2.1 launchTask
deflaunchTask(context:ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,serializedTask:
ByteBuffer): Unit = {
// 创建TaskRunner对象
val tr=
new TaskRunner(context,taskId
= taskId,attemptNumber
= attemptNumber,
taskName,
serializedTask)
// <task id, task runner> 放入内存缓存中
runningTasks.put(taskId,tr)
// 调用TaskRunner的run方法
threadPool.execute(tr)
}
2.2 killTask
2.3 TaskRunner的run方法
然后会创建一个Executor对象;然后接下来Driver内的DAGScheduler
向CoarseGrainedSchedulerBackend发送LaunchTask消息,然后这个SchedulerBackend实际上调用executor来发起任务,创建TaskRunner来执行task
一 核心属性
String executorHostname: executor对应的hostname
Boolean isLocal:是否是本地的
ThreadPoolExecutor threadPool:线程池
boolean userClassPathFirst: 是否首先加载用户jar中的class
MutableURLClassLoader urlClassLoader: URL类加载器
Long maxDirectResultSize: 直接结果最大为多大,首先从spark.task.maxDirectResultSize获取,如果没有设置则,默认是1M,还需要和rpc的message所允许的最大字节数比较,看谁比较小
Long maxResultSize:最大结果限制,默认是1GB
ConcurrentHashMap runningTasks: 正在运行的task列表
ScheduledExecutorService heartbeater: 心跳线程
HEARTBEAT_MAX_FAILURES :心跳检测失败默认值60
二 重要方法
2.1 launchTask
deflaunchTask(context:ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,serializedTask:
ByteBuffer): Unit = {
// 创建TaskRunner对象
val tr=
new TaskRunner(context,taskId
= taskId,attemptNumber
= attemptNumber,
taskName,
serializedTask)
// <task id, task runner> 放入内存缓存中
runningTasks.put(taskId,tr)
// 调用TaskRunner的run方法
threadPool.execute(tr)
}
2.2 killTask
def killTask(taskId: Long, interruptThread: Boolean): Unit = { // 获取task 对应的 TaskRunner val tr = runningTasks.get(taskId) // 调用kill方法kill task if (tr != null) { tr.kill(interruptThread) } }
2.3 TaskRunner的run方法
相关文章推荐
- Spark源码分析之worker节点启动driver和executor
- spark源码学习(二)---Master源码分析(3)-master对driver、executor的调度
- Spark1.3从创建到提交:5)Executor启动源码分析
- spark 1.6.0 core源码分析7 Spark executor的运行
- spark源码学习(八)--- executor启动task分析
- spark源码分析之Executor启动与任务提交篇
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Spark2.2 Executor原理剖析及源码分析
- spark core源码分析7 Executor的运行
- spark源码学习(三)---worker源码分析-worker启动driver、executor分析
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- Spark2.2 Driver和Executor状态改变处理机制源码分析
- Spark源码分析 – Executor
- spark-core_13:Worker源码分析1-Worker初始化过程
- Apache Spark源码分析-- Job的提交与运行
- Spark集群启动之Master、Worker启动流程源码分析
- 第12课:Spark Streaming源码解读之Executor容错安全性
- Spark 2.1.0 大数据平台源码分析:章节序列
- Spark源码分析之Scheduler模块(TaskScheduler)