Twitter Storm源代码分析之Topology的执行过程
2015-09-25 13:45
369 查看
我们通过前面的文章(Twitter Storm源代码分析之ZooKeeper中的目录结构)知道了storm集群里面nimbus是通过zookeeper来给supervisor发送指令的,并且知道了通过zookeeper到底交换了哪些信息。
那么一个topology从提交到执行到底是个什么样的过程?nimbus和supervisor到底做了什么样的事情呢?本文将带你去探寻这些答案。
nimbus.clj
supervisor.clj
worker.clj
task.clj
要提交一个topology给storm的话, 我们在命令行里面是这么做的:
帮助
那么在这个命令的背后,storm集群里面发生了什么呢?
看似简单的topology提交, 其实背后充满着血雨腥风(好吧,我夸张了), 我们来看看我们的幕后英雄nimbus, supervisor都做了什么。
首先由
帮助
topology的代码上传之后
它首先要对storm本身,以及topology进行一些校验:
它要检查storm的状态是否是active的
它要检查是否已经有同名的topology已经在storm里面运行了
因为我们会在代码里面给spout, bolt指定id, storm会检查是否有两个spout和bolt使用了相同的id。
任何一个id都不能以”__”开头, 这种命名方式是系统保留的。
帮助
如果以上检查都通过了,那么就进入下一步了。
然后为这个topology建立它的本地目录:
帮助
对应的代码:
帮助
nimbus老兄是个有责任心的人, 它虽然最终会把任务分成一个个task让supervisor去做, 但是他时刻都在关注着大家的情况, 所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息), 以让它知道事情还在正常发展, 如果有task超时不打招呼, nimbus会认为这个task不行了, 然后进行重新分配。zookeeper上面的心跳目录:
帮助
nimbus是个精明人, 它对每个topology都会做出详细的预算:需要多少工作量(多少个task)。它是根据topology定义中给的parallelism hint参数, 来给spout/bolt来设定task数目了,并且分配对应的task-id。并且把分配好task的信息写入zookeeper上的/task目录下:
帮助
从上图中注释中看到{task-id}这个文件里面存储的是它所代表的spout/bolt的id, 这其实就是一个细化工作量的过程。
打比方说我们的topology里面一共有一个spout, 一个bolt。 其中spout的parallelism是2, bolt的parallelism是4, 那么我们可以把这个topology的总工作量看成是6, 那么一共有6个task,那么/tasks/{topology-id}下面一共会有6个以task-id命名的文件,其中两个文件的内容是spout的id, 其它四个文件的内容是bolt的id。
看代码:
帮助
然后nimbus就要给supervisor分配工作了。工作分配的单位是task(上面已经计算好了的,并且已经给每个task编号了), 那么分配工作意思就是把上面定义好的一堆task分配给supervisor来做, 在nimbus里面,Assignment表示一个topology的任务分配信息:
帮助
其中核心数据就是task->node+port, 它其实就是从task-id到supervisor-id+port的映射, 也就是把这个task分配给某台机器的某个端口来做。 工作分配信息会被写入zookeeper的如下目录:
帮助
TODO: 补充工作分配的细节
到现在为止,任务都分配好了,那么我们可以正式启动这个topology了,在源代码里面,启动topology其实就是向zookeeper上面该topology所对应的目录写入这个topology的信息:
帮助
看代码:
帮助
好!nimbus干的不错,到这里为止nimbus的工作算是差不多完成了,它对topology进行了一些检查,发现没什么问题, 然后又评估了一下工作量, 然后再看看它的小弟们(supervisor)哪些有空,它进行了合理的分配,所有的事情都安排妥当了,nimbus终于可以松一口气了。下面就看supervisor的了。
我们的supervisor同志无时无刻不想着为大哥nimbus分忧, 它每隔几秒钟就去看看大哥有没有给它分配新的任务,这些逻辑主要在supervisor.clj里面的
首先它看看storm里面有没有新提交的它没有下载的topology的代码, 如果有的话, 它就把这个新topology的代码下载下来。它可不管这个topology由不由它负责哦(这一点是可以优化的)
帮助
然后它会删除那些已经不再运行的topology的代码
帮助
然后他根据老大哥nimbus给它指派的任务信息(task-id对应到的topology的spout或者bolt), 来让它自己的小弟:worker来做这个事情
帮助
worker是个苦命的人, 上面的nimbus, supervisor只会指手画脚, 它要来做所有的脏活累活。
1. 它首先去zookeeper上去看看老大哥们都给他分配了哪些task(task-ids)
帮助
2. 然后根据这些task-id来找出所对应的topology的spout/bolt
帮助
3. 计算出它所代表的这些spout/bolt会给哪些task发送消息
帮助
4. 建立到3里面所提到的那些task的连接(socket), 然后在需要发送消息的时候就通过这些socket来发送
帮助
到这里为止,topology里面的组件(spout/bolt)都根据parallelism被分成多个task, 而这些task被分配给supervisor的多个worker来执行。大家各司其职,整个topology已经运行起来了。
除非你显式地终止一个topology, 否则它会一直运行的,可以用下面的命令去终止一个topology:
帮助
在这个命令的背后,
帮助
上面的代码会把zookeeper上面
这块数据会在supervisor下次从zookeeper上同步数据的时候删除的(supervisor会删除那些已经不存在的topology相关的数据)。这样这个topology的数据就从storm集群上彻底删除了。
那么一个topology从提交到执行到底是个什么样的过程?nimbus和supervisor到底做了什么样的事情呢?本文将带你去探寻这些答案。
代码列表
nimbus.cljsupervisor.clj
worker.clj
task.clj
如何提交一个topology?
要提交一个topology给storm的话, 我们在命令行里面是这么做的:帮助
storm里的幕后英雄:nimbus,supervisor
看似简单的topology提交, 其实背后充满着血雨腥风(好吧,我夸张了), 我们来看看我们的幕后英雄nimbus, supervisor都做了什么。
上传topology的代码
首先由Nimbus$Iface的
beginFileUpload,
uploadChunk以及
finishFileUpload方法来把jar包上传到nimbus服务器上的/inbox目录
帮助
运行topology之前的一些校验
topology的代码上传之后Nimbus$Iface的
submitTopology方法会负责对这个topology进行处理,
它首先要对storm本身,以及topology进行一些校验:
它要检查storm的状态是否是active的
它要检查是否已经有同名的topology已经在storm里面运行了
因为我们会在代码里面给spout, bolt指定id, storm会检查是否有两个spout和bolt使用了相同的id。
任何一个id都不能以”__”开头, 这种命名方式是系统保留的。
帮助
建立topology的本地目录
然后为这个topology建立它的本地目录:帮助
帮助
建立topology在zookeeper上的心跳目录
nimbus老兄是个有责任心的人, 它虽然最终会把任务分成一个个task让supervisor去做, 但是他时刻都在关注着大家的情况, 所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息), 以让它知道事情还在正常发展, 如果有task超时不打招呼, nimbus会认为这个task不行了, 然后进行重新分配。zookeeper上面的心跳目录:帮助
计算topology的工作量
nimbus是个精明人, 它对每个topology都会做出详细的预算:需要多少工作量(多少个task)。它是根据topology定义中给的parallelism hint参数, 来给spout/bolt来设定task数目了,并且分配对应的task-id。并且把分配好task的信息写入zookeeper上的/task目录下:帮助
打比方说我们的topology里面一共有一个spout, 一个bolt。 其中spout的parallelism是2, bolt的parallelism是4, 那么我们可以把这个topology的总工作量看成是6, 那么一共有6个task,那么/tasks/{topology-id}下面一共会有6个以task-id命名的文件,其中两个文件的内容是spout的id, 其它四个文件的内容是bolt的id。
看代码:
帮助
把计算好的工作分配给supervisor去做
然后nimbus就要给supervisor分配工作了。工作分配的单位是task(上面已经计算好了的,并且已经给每个task编号了), 那么分配工作意思就是把上面定义好的一堆task分配给supervisor来做, 在nimbus里面,Assignment表示一个topology的任务分配信息:帮助
帮助
正式运行topology
到现在为止,任务都分配好了,那么我们可以正式启动这个topology了,在源代码里面,启动topology其实就是向zookeeper上面该topology所对应的目录写入这个topology的信息:帮助
帮助
Supervisor领任务
我们的supervisor同志无时无刻不想着为大哥nimbus分忧, 它每隔几秒钟就去看看大哥有没有给它分配新的任务,这些逻辑主要在supervisor.clj里面的synchronize-supervisor和
sync-processes两个方法里面它:
首先它看看storm里面有没有新提交的它没有下载的topology的代码, 如果有的话, 它就把这个新topology的代码下载下来。它可不管这个topology由不由它负责哦(这一点是可以优化的)
帮助
帮助
帮助
Worker执行
worker是个苦命的人, 上面的nimbus, supervisor只会指手画脚, 它要来做所有的脏活累活。1. 它首先去zookeeper上去看看老大哥们都给他分配了哪些task(task-ids)
帮助
帮助
帮助
帮助
Topology的终止
除非你显式地终止一个topology, 否则它会一直运行的,可以用下面的命令去终止一个topology:帮助
storm-cluster-state的
remove-storm!命令会被调用:
帮助
/tasks,
/assignments,
/storms下面有关这个topology的数据都删除了。这些数据(或者目录)之前都是nimbus创建的。还剩下
/taskbeats以及
/taskerrors下的数据没有清除,
这块数据会在supervisor下次从zookeeper上同步数据的时候删除的(supervisor会删除那些已经不存在的topology相关的数据)。这样这个topology的数据就从storm集群上彻底删除了。
相关文章推荐
- thinkphp根据时间戳查询时间范围内的记录
- test7.3 &7.4
- 中国省份代码表
- 一道关于C++继承类的面试题
- 浅谈C++类(4)--隐式类类型转换
- java项目临结局
- Lua中__index和__newindex之间的沉默与合作
- elasticsearch java api 使用ik 分词器
- 使eclipse XML文件内容变整齐
- C# 输出pdf文件流在页面上显示
- eclipse运行项目时怎么设置虚拟机内存大小
- ASP.NET 将数据生成PDF (二)
- asp.net生成PDF文件 (1)
- php中包含js产生的中文乱码问题
- github使用
- 谈C/C++指针精髓
- php学习基础篇之环境搭建
- Java异常处理机制【转载】
- 判断是否是闰年
- 小题精炼-----初试C语言