twitter storm源码走读之8 -- TridentTopology创建过程详解
2014-02-09 14:03
501 查看
欢迎转载,转载请注明出处,徽沪一郎。
从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构图来维护。具体到TridentTopology,实现图的各种操作的组件是jgrapht。
说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。
TridentTopology在转换成为普通的StormTopology时,需要将原始的图分成各个group,每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢;如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。
关于jgrapht的更多信息,请参考其官方网站 http://jgrapht.org
addNode
addSourcedNode
addSourcedStateNode
其中addNode在创建stream是使用,addSourcedStateNode在partitionPersist时使用到,其它的operation使用到的是addSourcedNode.
addNode与其它两个方法的一个重要区别还在于,addNode是不需要添加边(Edge),而其它两个API需要往图中添加edge,以确定该node的源是哪个。
在TridentTopology的构造函数中,创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。
newStream
addNode
registerNode
调用关系描述如下
Stream::each
TridentTopology::addSourcedNode
TridentTopology::registerSourcedNode
registerSourcedNode的实现如下
注意此处添加edge是,是有索引的,这样可以区别处理的先后顺序。
在Stream中含有成员变量_node,表示stream最近停泊的node,有了该变量添加edge才成为了可能。
调用关系
Stream::partitionPersist
TridentTopology::addSourcedStateNode
TridentTopology::registerSourcedNode
与addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream。
既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery,
从此处可以看出stateQueryNode最起码有两个inputStream,一是从TridentState而来表示状态已经改变,另一个是处于drpcStream这个方面的上一跳结点。
将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups
以graph和initialGroups作为入参创建GraphGrouper
分组的过程其实就是进行合并的过程,详见GraphGrouper::mergeFully()
如果从当前group1的输出目的地都是属于group2,则将group1,group2合并
如果当前group1的所有输入源都是来自于group2,则将group1,group2合并
将需要合并的group1,group2作为入参创建新的group,同时将group1,group2从已有的集合出移除
GraphGrouper::merge()
在group之间添加partitionNode
_graph中所有的node在变换过后,变成两组元素,一是spoutNodes,另一个是合并后的mergedGroup.
spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中,mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。
从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构图来维护。具体到TridentTopology,实现图的各种操作的组件是jgrapht。
说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。
TridentTopology在转换成为普通的StormTopology时,需要将原始的图分成各个group,每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢;如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。
关于jgrapht的更多信息,请参考其官方网站 http://jgrapht.org
概要
在TridentTopology中向图中添加结点的api有三种:addNode
addSourcedNode
addSourcedStateNode
其中addNode在创建stream是使用,addSourcedStateNode在partitionPersist时使用到,其它的operation使用到的是addSourcedNode.
addNode与其它两个方法的一个重要区别还在于,addNode是不需要添加边(Edge),而其它两个API需要往图中添加edge,以确定该node的源是哪个。
TridentTopology
public TridentTopology() { _graph = new DefaultDirectedGraph(new ErrorEdgeFactory()); _gen = new UniqueIdGen(); }
在TridentTopology的构造函数中,创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。
newStream
newStream会为DAG(有向无环图)中创建源结点,其调用关系如下所示。newStream
addNode
registerNode
protected void registerNode(Node n) { _graph.addVertex(n); if(n.stateInfo!=null) { String id = n.stateInfo.id; if(!_colocate.containsKey(id)) { _colocate.put(id, new ArrayList()); } _colocate.get(id).add(n); } }
each
作用于stream上的Operation有很多,以each为例来看新的operation是如何转换成为node添加到_graph中的。//Stream.java public Stream each(Fields inputFields, Function function, Fields functionFields) { projectionValidation(inputFields); return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, TridentUtils.fieldsConcat(getOutputFields(), functionFields), functionFields, new EachProcessor(inputFields, function))); }
调用关系描述如下
Stream::each
TridentTopology::addSourcedNode
TridentTopology::registerSourcedNode
registerSourcedNode的实现如下
protected void registerSourcedNode(List<Stream> sources, Node newNode) { registerNode(newNode); int streamIndex = 0; for(Stream s: sources) { _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex)); streamIndex++; } }
注意此处添加edge是,是有索引的,这样可以区别处理的先后顺序。
在Stream中含有成员变量_node,表示stream最近停泊的node,有了该变量添加edge才成为了可能。
partitionPersist
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) { projectionValidation(inputFields); String id = _topology.getUniqueStateId(); ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(), _name, functionFields, functionFields, new PartitionPersistProcessor(id, inputFields, updater)); n.committer = true; n.stateInfo = new NodeStateInfo(id, stateSpec); return _topology.addSourcedStateNode(this, n); }
调用关系
Stream::partitionPersist
TridentTopology::addSourcedStateNode
TridentTopology::registerSourcedNode
与addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream。
既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery,
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) { projectionValidation(inputFields); String stateId = state._node.stateInfo.id; Node n = new ProcessorNode(_topology.getUniqueStreamId(), _name, TridentUtils.fieldsConcat(getOutputFields(), functionFields), functionFields, new StateQueryProcessor(stateId, inputFields, function)); _topology._colocate.get(stateId).add(n); return _topology.addSourcedNode(this, n); }
从此处可以看出stateQueryNode最起码有两个inputStream,一是从TridentState而来表示状态已经改变,另一个是处于drpcStream这个方面的上一跳结点。
build
TridentTopology::build是将TridentTopology转变为StormTopology的过程,这一过程中最重要的一环就是将_graph中含有的node进行分组。grouping
算法逻辑概述将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups
以graph和initialGroups作为入参创建GraphGrouper
分组的过程其实就是进行合并的过程,详见GraphGrouper::mergeFully()
如果从当前group1的输出目的地都是属于group2,则将group1,group2合并
如果当前group1的所有输入源都是来自于group2,则将group1,group2合并
将需要合并的group1,group2作为入参创建新的group,同时将group1,group2从已有的集合出移除
public void mergeFully() { boolean somethingHappened = true; while(somethingHappened) { somethingHappened = false; for(Group g: currGroups) { Collection<Group> outgoingGroups = outgoingGroups(g); if(outgoingGroups.size()==1) { Group out = outgoingGroups.iterator().next(); if(out!=null) { merge(g, out); somethingHappened = true; break; } } Collection<Group> incomingGroups = incomingGroups(g); if(incomingGroups.size()==1) { Group in = incomingGroups.iterator().next(); if(in!=null) { merge(g, in); somethingHappened = true; break; } } } } }
GraphGrouper::merge()
private void merge(Group g1, Group g2) { Group newGroup = new Group(g1, g2); currGroups.remove(g1); currGroups.remove(g2); currGroups.add(newGroup); for(Node n: newGroup.nodes) { groupIndex.put(n, newGroup); } }
在group之间添加partitionNode
// add identity partitions between groups for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) { if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) { Group g1 = grouper.nodeGroup(e.source); Group g2 = grouper.nodeGroup(e.target); // g1 being null means the source is a spout node if(g1==null && !(e.source instanceof SpoutNode)) throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning"); if(g1==null || !g1.equals(g2)) { graph.removeEdge(e); PartitionNode pNode = makeIdentityPartition(e.source); graph.addVertex(pNode); graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0)); graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); } } }
_graph中所有的node在变换过后,变成两组元素,一是spoutNodes,另一个是合并后的mergedGroup.
spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中,mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。
小结
到目前为止,通过两篇文章分析了TridentTopology的创建过程及其运行时在每个TridentBoltExecutor中的消息传递情况。接下来会分析TridentTopology提供的API实现及其作用场景。相关文章推荐
- CentOS将用户添加到sudoer列表
- bash 快捷键
- getopt函数的使用
- windows通过ssh连接linux机器
- centos修改IP地址
- RedHat-Linux密码破解
- linux路由添加
- Linux Apache php MySQL GD PHPWind 集成环境配置
- linux mount (挂载命令)详解
- Linux /dev目录详解和Linux系统各个目录的作用
- linux 进程状态
- Linux开机时停在 Starting sendmail 不动了的解决方案
- 解决CentOS语言更改成简体中文出现的乱码问题
- HDU 1195 Open the Lock
- linux 多线程-条件变量
- 虚拟机Ubuntu 10.04安装arm-linux-gcc.4.3.2 过程
- Linux 查看系统硬件信息(实例详解)
- linux 的dd命令
- linux rm -rf * 文件恢复记
- Cannot open keyboard?