第25课:Spark Streaming的StreamingContext启动及JobScheduler启动源码图解
2016-06-14 18:43
393 查看
本期内容:
1. StreamingContext启动源码图解
2. JobScheduler启动源码图解
StreamingContext的start方法对INITIALIZED、ACTIVE、STOPPED等状态分别做不同处理。
JobScheduler中的eventLoop是带有一个消息队列的线程循环器。接收的消息有JobStarted、JobCompleted、ErrorReported等三种。eventLoop用start方法启动。
JobScheduler还把所有InputDStream的rateController放入监听器中。然后启动了listenerBus。
然后启动了receiverTracker、jobGenerator。这两个在JobScheduler中至关重要。
ReceiverTracker中有个适用于RPC的ReceiverTrackerEndpoint类型的消息循环体,用receive方法接收StartAllReceivers、RestartReceiver、CleanupOldBlocks、UpdateReceiverRteLimit等4种消息,用receiveAndREply方法接收RegisterReceiver、AddBlock、DeregisterReceiver、AllReceiverIds、StopAllReceivers等五种消息。消息有自己给自己发的消息,也有远程发来的消息。
shutdownHookManager用于关闭时。env.metricsSystem用于计量,注册的计量信息在StreamingSource中有定义。uiTab用于web界面。针对UI界面,定义有StreamingPage、BatchPage等WebUIPage的子类 ,做定制时也可定义自己的Page。
源码需要在细节上下工夫。深度决定广度。
JobGenerator中也有一个消息循环体。
所以,JobScheduler有3个消息循环体。如下图所示
1. StreamingContext启动源码图解
2. JobScheduler启动源码图解
StreamingContext的start方法对INITIALIZED、ACTIVE、STOPPED等状态分别做不同处理。
/** * Start the execution of the streams. * * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
JobScheduler中的eventLoop是带有一个消息队列的线程循环器。接收的消息有JobStarted、JobCompleted、ErrorReported等三种。eventLoop用start方法启动。
JobScheduler还把所有InputDStream的rateController放入监听器中。然后启动了listenerBus。
然后启动了receiverTracker、jobGenerator。这两个在JobScheduler中至关重要。
ReceiverTracker中有个适用于RPC的ReceiverTrackerEndpoint类型的消息循环体,用receive方法接收StartAllReceivers、RestartReceiver、CleanupOldBlocks、UpdateReceiverRteLimit等4种消息,用receiveAndREply方法接收RegisterReceiver、AddBlock、DeregisterReceiver、AllReceiverIds、StopAllReceivers等五种消息。消息有自己给自己发的消息,也有远程发来的消息。
shutdownHookManager用于关闭时。env.metricsSystem用于计量,注册的计量信息在StreamingSource中有定义。uiTab用于web界面。针对UI界面,定义有StreamingPage、BatchPage等WebUIPage的子类 ,做定制时也可定义自己的Page。
源码需要在细节上下工夫。深度决定广度。
JobGenerator中也有一个消息循环体。
所以,JobScheduler有3个消息循环体。如下图所示
相关文章推荐
- Android样式的开发:View Animation篇
- Andrew Ng机器学习之一 导论
- leetcode 226. Invert Binary Tree
- 监测树莓派GPU使用情况
- Python简介与入门(基于Python2.7)
- ftp文件服务器与web项目结合
- MyBatis基础(二)—持久层开发的两种方法
- Android--Demo_PullToRefresh(进阶篇)
- volley的用法(get请求)
- iOS开发拓展篇——如何把项目托管到GitHub
- OC中关键字的意思
- 矩阵相关运算代码实现
- iOS objc_msgSend 野指针Crash 从 Log 提取 Crash 时 selector 的地址和名字并打印
- LNK4098 defaultlib 'library' conflicts with use of other libs; use /NODEFAULTLIB:library
- SpringMVC XML配置
- iOS objc_msgSend 野指针Crash 从 Log 提取 Crash 时 selector 的地址和名字并打印
- Log4j具体输出信息级别配置方法
- PowerDesinger生成mysql语句
- Startssl 现在就启用 HTTPS,免费的!
- Spring的PropertyPlaceholderConfigurer应用