storm相关文章 1
2014-11-19 15:58
197 查看
文章来自http://blog.csdn.net/weeniebear/article/details/22586831
http://blog.csdn.net/weeniebear/article/details/18147333 http://segmentfault.com/blog/budong/1190000000647149
1、Storm中Topology任务调度策略
Storm中负责Topo分配的工作由nimbus负责,具体代码在nimbus.clj中。
对于一个新Topo的分配来说,主要经历两个阶段:
1. 逻辑分配阶段
这里又会涉及到两个概念executor和task,简单讲对于一个具体的component来说,task就是component在运行时的实例个数,即component使静态的class代码,task是运行时的具体object对象,task的个数即是component在runtime是被实例化的对象个数,
而executor可以理解为线程的概念,一个component对应的executor个数就是component运行时所独占的线程数,举例来讲,某个component的task个数是6,executor个数是2,则运行时component就有6个实例运行在2个线程中,一个线程负责执行3其中3个
task,默认情况下一般会将task个数配置为executor的个数,即每一个线程只负责执行一个component的实例化对象。
具体可以看官方的解释:https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology
逻辑阶段所作的工作就是计算Topology中所有的component的executor个数,task个数,然后将所有的task分配到executor中。
2. 物理分配阶段
executor代表的是线程,具体要落地执行还需要依附于进程,因此物理分配阶段做的工作就是将所有的executor分配到worker slot进程中(一个slot代表一个jvm虚拟机)。
由于在逻辑分配阶段,task就是按照topo进行了排序,即相同component所属的task排列在一起,而在物理分配阶段slot资源也是按照端口进行了排序,即相同端口的slot排在了一起,
而具体分配算法是将排好序的task一次轮序分配到排好序的slot中,因此同一个component所属的不同task会尽可能的分到不同机器的相同端口上的slot中,实现了整个Topo的负载均衡,
这样分配的好处是防止同一个component的所有task都分配到同一台机器上,造成整个集群负载不均。
具体负责Topo分配的函数为
(declare mk-assignments)
具体代码就不展开了,其中核心计算逻辑对应的几个方法调用链为
1. compute-new-topology->executor->node+port
计算topology的每一个component对应的slots资源
2. compute-topology->executors
计算topology中每一个component分配到的executor
3. compute-executors
将task分配到executor中
4. storm-task-info
计算taskId->componentId的对应关系
5. DefaultScheduler.-schedule
将executor分配到slot中
最后要说明的一点是一个Component对应的Task个数是如何获取的。
在调用mk-assignments之前,还调用了一个叫做normalize-topology的函数,称为topo的规范化。规范化的主要工作就是设置component的Task个数,即component运行时的instance个数。
[plain] view
plaincopy
(defn normalize-topology [storm-conf ^StormTopology topology]
(let [ret (.deepCopy topology)]
(doseq [[_ component] (all-components ret)]
(.set_json_conf
(.get_common component)
(->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
(merge (component-conf component))
to-json )))
ret ))
计算component对应task个数,这里起作用的参数有三个
1. TOPOLOGY-MAX-TASK-PARALLELIS
2. TOPOLOGY-TASKS
3. parallelism-hint (该component对应的executor个数)
具体算法为:
1. 如果设置了TOPOLOGY-MAX-TASK-PARALLELISM和TOPOLOGY-TASKS,则获取两者中较小者
2. 如果没有设置TOPOLOGY-TASKS,但是设置了TOPOLOGY-MAX-TASK-PARALLELISM和parallelism-hint,则获取两者中较小者
3. 如果没有设置TOPOLOGY-MAX-TASK-PARALLELISM,则取TOPOLOGY-TASKS,若两者都没有设置,则取parallelism-hint
简单可以总结为:可以为Task单独设值的参数为TOPOLOGY-MAX-TASK-PARALLELIS和TOPOLOGY-TASKS,并且以其中较小者为准,如果这两个参数均没有设置,则将task个数设置为parallelism-hint即该component的运行时并发executor的个数。
[plain] view
plaincopy
(defn- component-parallelism [storm-conf component]
(let [storm-conf (merge storm-conf (component-conf component))
num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
(min max-parallelism num-tasks)
num-tasks)))
2、Storm Transactional Batch Process原理
Storm中实现实时批处理的模块由IBatchBolt, BatchBoltExecutor, CoordinatedBolt等数据结构实现,它们之间的关系如图所示:
Storm实现事物批处理是通过在普通的IBatchBolt外面包装一层CoordinatedBolt,通过CoordinatedBolt实现批次和批次之间的事务性协调,其中记录批次元数据的神秘数据结构就是TrackingInfo, 具体定义如下:
[java] view
plaincopy
public static class TrackingInfo {
// 记录该bolt针对该批次已经收到了多少个上游节点发送的coord tuple(即有多少个上游节点报告已经发完了本批数据).
int reportCount = 0;
// 记录该bolt针对该批次预期应该接收到的regular tuple(非控制tuple)总数. 该值是该bolt的所有上游节点通过coord stream发送的tuple中获得.
int expectedTupleCount = 0;
// 记录该批次已经接收到的regular tuple(非控制tuple)总量.
int receivedTuples = 0;
// 记录该bolt向下游task已发送过的tuple数量. key为taskId, value为已发送tuple总数.
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
// 对于非committer bolt, 标识该bolt是否已经接收到了tuple;
// 对于committer bolt, 标识该committer bolt是否已经接收到了master bolt针对该batch发来的commit tuple, 将向下游发送coord tuple的时间推迟到接收到commit tuple后,而不是完成对于本批次的处理.
boolean receivedId = false;
// 在CoordinatedOutputCollector的ack中被置为true, 标识该批次中是否有tuple处理失败.
boolean failed = false;
// 标识该批次包含的所有tuple都已在该bolt中处理完成,该bolt对于该批次的元数据可以被删除.
boolean finished = false;
// 记录上游或者master bolt发送过来的coord tuple与commit tuple. 在checkFinishId调用时逐一进行ack.
List<Tuple> ackTuples = new ArrayList<Tuple>();
}
对于事务型批处理而言,无非要解决的核心问题是知道什么时候一个批次处理完成,成功完成后续该如何处理,失败了又该如何处理。
在CoordinatedBolt的execute会在接收到tuple时被调用。
[java] view
plaincopy
public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
// 首先判断接收到的tuple类型
TupleType type = getTupleType(tuple);
synchronized(_tracked) {
// 根据batchId获得对应的Batch元数据BatchInfo.
track = _tracked.get(id);
if(track==null) {
track = new TrackingInfo();
// _idStreamSpec非空, 表示该bolt是一个committer bolt.
// 如果不是committer bolt, 在第一次接收到tuple后,将batch的receivedId标志位置为true, 在后面执行checkFinishId时, 会将receivedId是否为true来作为bolt对一个批次是否处理完成条件之一.
if(_idStreamSpec==null) track.receivedId = true;
_tracked.put(id, track);
}
}
// 如果接收到控制流tuple, commit tuple(committer bolt)或者coord tuple(非committer bolt),则调用checkFinishId检查bolt是否完成了本批次处理.
// 对于committer bolt, 只有接收到了commit tuple后,才会将receiveId置为true.
if(type==TupleType.ID) {
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
// 每次接收到上游发来的coord tuple, 说明上游已经完成了本批次tuple处理, 从tuple中获取上游告知的本批次已发送tuple总数, 并进行相应元数据更新(该上游已经报备过).
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
synchronized(_tracked) {
_delegate.execute(tuple);
}
}
}
[java] view
plaincopy
// 判断收到的tuple类型
private TupleType getTupleType(Tuple tuple) {
// committer tuple
if(_idStreamSpec!=null
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
return TupleType.ID;
// coord tuple
} else if(!_sourceArgs.isEmpty()
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
return TupleType.COORD;
// data tuple
} else {
return TupleType.REGULAR;
}
}
上述逻辑解决了预期的问题,即我这个bolt对于该批次预期应该收到多少条数据。另一个就是实际收到了多少数据。
[java] view
plaincopy
public class CoordinatedOutputCollector implements IOutputCollector {
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
// 每一条tuple消息被ack的时候,会更新实际收到并处理的tuple总数.
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
if(failed) {
_delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
}
}
最后就是将实际收到的数据量和预期进行对比,看看本批次是否已经完成处理。
CoordinatedBolt的函数checkFinishId完成了这个工作。
[java] view
plaincopy
private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
try {
if(track!=null) {
boolean delayed = false;
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
if(track.failed) {
failed = true;
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
// 成功完成一个批次的条件:
// 1. 至少接收过一次tuple(对于非committer bolt可以是任意tuple, 对于committer bolt需要收到commit tuple后才向下游发送coord tuple)
// 2. 接收到了所有上游节点发来的coord tuple(所有上游节点都已通知自己已经处理完了本批次)
// 3. 已经接收到的tuple总数和所有上游宣称发送的数据总数一致
} else if(track.receivedId
&& (_sourceArgs.isEmpty() ||
track.reportCount==_numSourceReports &&
track.expectedTupleCount == track.receivedTuples)){
if(_delegate instanceof FinishedCallback) {
((FinishedCallback)_delegate).finishedId(id);
}
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}
// 向下游节点发送coord tuple消息,通知自己已经处理完了本批次以及发送给下游的数据量
Iterator<Integer> outTasks = _countOutTasks.iterator();
while(outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = get(track.taskEmittedTuples, task, 0);
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}
// 向上游发送ack tuple, 告知上游自己已经处理完成本批次
for(Tuple t: track.ackTuples) {
_collector.ack(t);
}
track.finished = true;
_tracked.remove(id);
}
if(!delayed && type!=TupleType.REGULAR) {
if(track.failed) {
_collector.fail(tup);
} else {
_collector.ack(tup);
}
}
} else {
if(type!=TupleType.REGULAR) _collector.fail(tup);
}
} catch(FailedException e) {
LOG.error("Failed to finish batch", e);
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
failed = true;
}
}
return failed;
}
3、storm定时器timer源码分析-timer.clj
storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid,很好理解,时间表示该定时任务什么时候执行,函数表示要执行的函数,uuid用于标识该"定时任务"。"定时任务"被存放到定时器的PriorityQueue队列中(和PriorityBlockingQueue区别,在于没有阻塞机制,不是线程安全的)。优先级队列是堆数据结构的典型应用,如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅
Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法。优先级队列不允许null元素。依靠自然排序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。当然也可以自己重新实现Comparator接口, 比如storm定时器就用reify重新实现了Comparator接口。storm定时器的执行过程比较简单,通过timer-thread,不断检查PriorityQueue里面时间最小的"定时任务"是否已经可以触发了, 如果可以(当前时间>=执行时间),就poll出来,调用callback,并sleep。storm定时器相关的函数均定义在timer.clj文件中,storm定时器是由mk-timer函数创建的,mk-timer函数定义如下:
mk-timer函数
我们可以通过调用cancel-timer函数中断一个timer-thread线程,cancel-timer函数定义如下:
cancel-timer函数
check-active!函数定义如下:
check-active!函数
通过调用schedule函数和schedule-recurring函数我们可以向storm定时器中添加"定时任务"。schedule函数定义如下:
schedule函数
schedule-recurring函数定义如下:
chedule-recurring函数也很简单,与schedule函数的区别就是在"定时任务"的callback函数中又添加了一个相同的"定时任务"。schedule函数的语义可以理解成向定时器添加
一个"一次性任务",schedule-recurring函数的语义可以理解成向定时器添加"一个周期执行的定时任务"(开始执行时间=当前时间+延迟时间,然后每隔recur-secs执行一次)
schedule-recurring函数
nimbus检查心跳和重分配任务的实现就是通过schedule-recurring函数向storm定时器添加了一个"周期任务"实现的。
http://blog.csdn.net/weeniebear/article/details/18147333 http://segmentfault.com/blog/budong/1190000000647149
1、Storm中Topology任务调度策略
Storm中负责Topo分配的工作由nimbus负责,具体代码在nimbus.clj中。
对于一个新Topo的分配来说,主要经历两个阶段:
1. 逻辑分配阶段
这里又会涉及到两个概念executor和task,简单讲对于一个具体的component来说,task就是component在运行时的实例个数,即component使静态的class代码,task是运行时的具体object对象,task的个数即是component在runtime是被实例化的对象个数,
而executor可以理解为线程的概念,一个component对应的executor个数就是component运行时所独占的线程数,举例来讲,某个component的task个数是6,executor个数是2,则运行时component就有6个实例运行在2个线程中,一个线程负责执行3其中3个
task,默认情况下一般会将task个数配置为executor的个数,即每一个线程只负责执行一个component的实例化对象。
具体可以看官方的解释:https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology
逻辑阶段所作的工作就是计算Topology中所有的component的executor个数,task个数,然后将所有的task分配到executor中。
2. 物理分配阶段
executor代表的是线程,具体要落地执行还需要依附于进程,因此物理分配阶段做的工作就是将所有的executor分配到worker slot进程中(一个slot代表一个jvm虚拟机)。
由于在逻辑分配阶段,task就是按照topo进行了排序,即相同component所属的task排列在一起,而在物理分配阶段slot资源也是按照端口进行了排序,即相同端口的slot排在了一起,
而具体分配算法是将排好序的task一次轮序分配到排好序的slot中,因此同一个component所属的不同task会尽可能的分到不同机器的相同端口上的slot中,实现了整个Topo的负载均衡,
这样分配的好处是防止同一个component的所有task都分配到同一台机器上,造成整个集群负载不均。
具体负责Topo分配的函数为
(declare mk-assignments)
具体代码就不展开了,其中核心计算逻辑对应的几个方法调用链为
1. compute-new-topology->executor->node+port
计算topology的每一个component对应的slots资源
2. compute-topology->executors
计算topology中每一个component分配到的executor
3. compute-executors
将task分配到executor中
4. storm-task-info
计算taskId->componentId的对应关系
5. DefaultScheduler.-schedule
将executor分配到slot中
最后要说明的一点是一个Component对应的Task个数是如何获取的。
在调用mk-assignments之前,还调用了一个叫做normalize-topology的函数,称为topo的规范化。规范化的主要工作就是设置component的Task个数,即component运行时的instance个数。
[plain] view
plaincopy
(defn normalize-topology [storm-conf ^StormTopology topology]
(let [ret (.deepCopy topology)]
(doseq [[_ component] (all-components ret)]
(.set_json_conf
(.get_common component)
(->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
(merge (component-conf component))
to-json )))
ret ))
计算component对应task个数,这里起作用的参数有三个
1. TOPOLOGY-MAX-TASK-PARALLELIS
2. TOPOLOGY-TASKS
3. parallelism-hint (该component对应的executor个数)
具体算法为:
1. 如果设置了TOPOLOGY-MAX-TASK-PARALLELISM和TOPOLOGY-TASKS,则获取两者中较小者
2. 如果没有设置TOPOLOGY-TASKS,但是设置了TOPOLOGY-MAX-TASK-PARALLELISM和parallelism-hint,则获取两者中较小者
3. 如果没有设置TOPOLOGY-MAX-TASK-PARALLELISM,则取TOPOLOGY-TASKS,若两者都没有设置,则取parallelism-hint
简单可以总结为:可以为Task单独设值的参数为TOPOLOGY-MAX-TASK-PARALLELIS和TOPOLOGY-TASKS,并且以其中较小者为准,如果这两个参数均没有设置,则将task个数设置为parallelism-hint即该component的运行时并发executor的个数。
[plain] view
plaincopy
(defn- component-parallelism [storm-conf component]
(let [storm-conf (merge storm-conf (component-conf component))
num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
(min max-parallelism num-tasks)
num-tasks)))
2、Storm Transactional Batch Process原理
Storm中实现实时批处理的模块由IBatchBolt, BatchBoltExecutor, CoordinatedBolt等数据结构实现,它们之间的关系如图所示:
Storm实现事物批处理是通过在普通的IBatchBolt外面包装一层CoordinatedBolt,通过CoordinatedBolt实现批次和批次之间的事务性协调,其中记录批次元数据的神秘数据结构就是TrackingInfo, 具体定义如下:
[java] view
plaincopy
public static class TrackingInfo {
// 记录该bolt针对该批次已经收到了多少个上游节点发送的coord tuple(即有多少个上游节点报告已经发完了本批数据).
int reportCount = 0;
// 记录该bolt针对该批次预期应该接收到的regular tuple(非控制tuple)总数. 该值是该bolt的所有上游节点通过coord stream发送的tuple中获得.
int expectedTupleCount = 0;
// 记录该批次已经接收到的regular tuple(非控制tuple)总量.
int receivedTuples = 0;
// 记录该bolt向下游task已发送过的tuple数量. key为taskId, value为已发送tuple总数.
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
// 对于非committer bolt, 标识该bolt是否已经接收到了tuple;
// 对于committer bolt, 标识该committer bolt是否已经接收到了master bolt针对该batch发来的commit tuple, 将向下游发送coord tuple的时间推迟到接收到commit tuple后,而不是完成对于本批次的处理.
boolean receivedId = false;
// 在CoordinatedOutputCollector的ack中被置为true, 标识该批次中是否有tuple处理失败.
boolean failed = false;
// 标识该批次包含的所有tuple都已在该bolt中处理完成,该bolt对于该批次的元数据可以被删除.
boolean finished = false;
// 记录上游或者master bolt发送过来的coord tuple与commit tuple. 在checkFinishId调用时逐一进行ack.
List<Tuple> ackTuples = new ArrayList<Tuple>();
}
对于事务型批处理而言,无非要解决的核心问题是知道什么时候一个批次处理完成,成功完成后续该如何处理,失败了又该如何处理。
在CoordinatedBolt的execute会在接收到tuple时被调用。
[java] view
plaincopy
public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
// 首先判断接收到的tuple类型
TupleType type = getTupleType(tuple);
synchronized(_tracked) {
// 根据batchId获得对应的Batch元数据BatchInfo.
track = _tracked.get(id);
if(track==null) {
track = new TrackingInfo();
// _idStreamSpec非空, 表示该bolt是一个committer bolt.
// 如果不是committer bolt, 在第一次接收到tuple后,将batch的receivedId标志位置为true, 在后面执行checkFinishId时, 会将receivedId是否为true来作为bolt对一个批次是否处理完成条件之一.
if(_idStreamSpec==null) track.receivedId = true;
_tracked.put(id, track);
}
}
// 如果接收到控制流tuple, commit tuple(committer bolt)或者coord tuple(非committer bolt),则调用checkFinishId检查bolt是否完成了本批次处理.
// 对于committer bolt, 只有接收到了commit tuple后,才会将receiveId置为true.
if(type==TupleType.ID) {
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
// 每次接收到上游发来的coord tuple, 说明上游已经完成了本批次tuple处理, 从tuple中获取上游告知的本批次已发送tuple总数, 并进行相应元数据更新(该上游已经报备过).
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
synchronized(_tracked) {
_delegate.execute(tuple);
}
}
}
[java] view
plaincopy
// 判断收到的tuple类型
private TupleType getTupleType(Tuple tuple) {
// committer tuple
if(_idStreamSpec!=null
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
return TupleType.ID;
// coord tuple
} else if(!_sourceArgs.isEmpty()
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
return TupleType.COORD;
// data tuple
} else {
return TupleType.REGULAR;
}
}
上述逻辑解决了预期的问题,即我这个bolt对于该批次预期应该收到多少条数据。另一个就是实际收到了多少数据。
[java] view
plaincopy
public class CoordinatedOutputCollector implements IOutputCollector {
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
// 每一条tuple消息被ack的时候,会更新实际收到并处理的tuple总数.
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
if(failed) {
_delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
}
}
最后就是将实际收到的数据量和预期进行对比,看看本批次是否已经完成处理。
CoordinatedBolt的函数checkFinishId完成了这个工作。
[java] view
plaincopy
private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
try {
if(track!=null) {
boolean delayed = false;
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
if(track.failed) {
failed = true;
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
// 成功完成一个批次的条件:
// 1. 至少接收过一次tuple(对于非committer bolt可以是任意tuple, 对于committer bolt需要收到commit tuple后才向下游发送coord tuple)
// 2. 接收到了所有上游节点发来的coord tuple(所有上游节点都已通知自己已经处理完了本批次)
// 3. 已经接收到的tuple总数和所有上游宣称发送的数据总数一致
} else if(track.receivedId
&& (_sourceArgs.isEmpty() ||
track.reportCount==_numSourceReports &&
track.expectedTupleCount == track.receivedTuples)){
if(_delegate instanceof FinishedCallback) {
((FinishedCallback)_delegate).finishedId(id);
}
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}
// 向下游节点发送coord tuple消息,通知自己已经处理完了本批次以及发送给下游的数据量
Iterator<Integer> outTasks = _countOutTasks.iterator();
while(outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = get(track.taskEmittedTuples, task, 0);
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}
// 向上游发送ack tuple, 告知上游自己已经处理完成本批次
for(Tuple t: track.ackTuples) {
_collector.ack(t);
}
track.finished = true;
_tracked.remove(id);
}
if(!delayed && type!=TupleType.REGULAR) {
if(track.failed) {
_collector.fail(tup);
} else {
_collector.ack(tup);
}
}
} else {
if(type!=TupleType.REGULAR) _collector.fail(tup);
}
} catch(FailedException e) {
LOG.error("Failed to finish batch", e);
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
failed = true;
}
}
return failed;
}
3、storm定时器timer源码分析-timer.clj
storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid,很好理解,时间表示该定时任务什么时候执行,函数表示要执行的函数,uuid用于标识该"定时任务"。"定时任务"被存放到定时器的PriorityQueue队列中(和PriorityBlockingQueue区别,在于没有阻塞机制,不是线程安全的)。优先级队列是堆数据结构的典型应用,如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅
Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法。优先级队列不允许null元素。依靠自然排序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。当然也可以自己重新实现Comparator接口, 比如storm定时器就用reify重新实现了Comparator接口。storm定时器的执行过程比较简单,通过timer-thread,不断检查PriorityQueue里面时间最小的"定时任务"是否已经可以触发了, 如果可以(当前时间>=执行时间),就poll出来,调用callback,并sleep。storm定时器相关的函数均定义在timer.clj文件中,storm定时器是由mk-timer函数创建的,mk-timer函数定义如下:
mk-timer函数
;; kill-fn函数会在timer-thread发生exception的时候被调用,timer-name标识定时器的名称 (defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil] ;; queue绑定PriorityQueue队列,创建PriorityQueue队列时指定队列初始容量为10,并指定一个Comparator比 ;;较器,Comparator比较器比较"定时任务"执行时间的大小,这样每次poll出执行时间最小的"定时任务", ;; PriorityQueue队列是一个依赖执行时间的小顶堆 (let [queue (PriorityQueue. 10 (reify Comparator (compare [this o1 o2] (- (first o1) (first o2))) (equals [this obj] true))) ;; active标识timer-thread是"active"的 active (atom true) ;; 创建一个锁,因为PriorityQueue并不是线程安全的,所以通过这个锁,可以使多线程互斥访问PriorityQueue lock (Object.) ;; notifier是一个java信号量,初始值为0,notifier信号量的主要功能就是当我们调用cancel-timer函数中断 ;; 一个timer-thread时,等待timer-thread结束,当timer-thread结束前会release notifier信号量 notifier (Semaphore. 0) ;; thread-name绑定timer-thread线程名,没有指定时默认为"timer" thread-name (if timer-name timer-name "timer") ;; timer-thread线程 timer-thread (Thread. (fn [] ;; 当timer-thread为"active"即active=true时,进入while循环 (while @active (try ;; peek函数从PriorityQueue获取执行时间最小的"定时任务",但并不出队列。time-mil ;; lis绑定执行时间,elem绑定"定时任务"数据 (let [[time-millis _ _ :as elem] (locking lock (.peek queue))] ;; 如果elem不为nil且当前时间>=执行时间,那么先加锁,然后poll出该"定时任务", ;; 并将"定时任务"的callback函数绑定到afn,最后调用该函数;否则判断time-millis ;; 是否为nil。 ;; 我们可以发现该定时器是软时间执行"定时任务"的,也就是说"定时任务"有可能被延 ;; 迟执行,同时如果afn函数执行时间比较长,那么会影响下一个"定时任务"的执行 (if (and elem (>= (current-time-millis) time-millis)) ;; It is imperative to not run the function ;; inside the timer lock. Otherwise, it is ;; possible to deadlock if the fn deals with ;; other locks, like the submit lock. (let [afn (locking lock (second (.poll queue)))] ;; 执行"定时任务"的callback函数 (afn)) ;; 该if语句是上面if语句的else分支,判断time-millis是否为nil,如果time-mill ;; is不为nil,则timer-thread线程sleep(执行时间-当前时间);否则sleep(1000) ;; 表明PriorityQueue中没有"定时任务" (if time-millis ;; If any events are scheduled, sleep until ;; event generation. If any recurring events ;; are scheduled then we will always go ;; through this branch, sleeping only the ;; exact necessary amount of time. (Time/sleep (- time-millis (current-time-millis))) ;; Otherwise poll to see if any new event ;; was scheduled. This is, in essence, the ;; response time for detecting any new event ;; schedulings when there are no scheduled ;; events. (Time/sleep 1000)))) (catch Throwable t ;; Because the interrupted exception can be ;; wrapped in a RuntimeException. ;; 检查是否是InterruptedException,如果是InterruptedException,说明线程是由 ;; 于接收interrupt信号而中断的,不做异常处理,否则调用kill-fn函数、修改线程 ;; 状态并抛出该异常 (when-not (exception-cause? InterruptedException t) (kill-fn t) (reset! active false) (throw t))))) ;; release notifier信号量,标识timer—thread运行结束 (.release notifier)) thread-name)] ;; 设置timer-thread为守护线程 (.setDaemon timer-thread true) ;; 设置timer-thread为最高优先级 (.setPriority timer-thread Thread/MAX_PRIORITY) ;; 启动timer-thread线程 (.start timer-thread) ;; 返回该定时器的"属性" {:timer-thread timer-thread :queue queue :active active :lock lock :cancel-notifier notifier}))
我们可以通过调用cancel-timer函数中断一个timer-thread线程,cancel-timer函数定义如下:
cancel-timer函数
(defn cancel-timer [timer] ;; 检查timer状态是否是"active",如果不是则抛出异常 (check-active! timer) ;; 加锁 (locking (:lock timer) ;; 将timer的状态active设置成false,即"dead" (reset! (:active timer) false) ;; 调用interrupt方法,中断线程,通过mk-timer函数我们可以知道在线程的run方法内调用了sleep方法, ;; 当接收到中断新号后会抛出InterruptedException异常使线程退出 (.interrupt (:timer-thread timer))) ;; acquire timer中的notifier信号量,因为只有当线程结束前才会release notifier信号量,所以此处是等待线程;;; 结束 (.acquire (:cancel-notifier timer)))
check-active!函数定义如下:
check-active!函数
(defn- check-active! [timer] (when-not @(:active timer) (throw (IllegalStateException. "Timer is not active"))))
通过调用schedule函数和schedule-recurring函数我们可以向storm定时器中添加"定时任务"。schedule函数定义如下:
schedule函数
(defnk schedule ;; timer绑定定时器,delay-secs绑定"定时任务"相对当前时间的延迟时间,afn绑定callback函数,check-active是;; 否需要检查定时器 [timer delay-secs afn :check-active true] ;; 检查定时器状态 (when check-active (check-active! timer)) (let [id (uuid) ^PriorityQueue queue (:queue timer)] ;; 加锁,执行时间=当前时间+延迟时间,将"定时任务"的vector类型数据添加到PriorityQueue队列中 (locking (:lock timer) (.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))
schedule-recurring函数定义如下:
chedule-recurring函数也很简单,与schedule函数的区别就是在"定时任务"的callback函数中又添加了一个相同的"定时任务"。schedule函数的语义可以理解成向定时器添加
一个"一次性任务",schedule-recurring函数的语义可以理解成向定时器添加"一个周期执行的定时任务"(开始执行时间=当前时间+延迟时间,然后每隔recur-secs执行一次)
schedule-recurring函数
(defn schedule-recurring [timer delay-secs recur-secs afn] (schedule timer delay-secs (fn this [] (afn) ; This avoids a race condition with cancel-timer. (schedule timer recur-secs this :check-active false))))
nimbus检查心跳和重分配任务的实现就是通过schedule-recurring函数向storm定时器添加了一个"周期任务"实现的。
(schedule-recurring (:timer nimbus) 0 (conf NIMBUS-MONITOR-FREQ-SECS) (fn [] (when (conf NIMBUS-REASSIGN) (locking (:submit-lock nimbus) (mk-assignments nimbus))) (do-cleanup nimbus) ))
相关文章推荐
- storm的一些相关文章
- Storm相关文章索引(1)
- 劳动法相关文章
- phpcms v9后台添加文章时选择相关文章可调用其它模型信息的方法
- 关于通过标签取得相关文章的算法
- Windows下安装Grunt的指南和相关说明(文章长,慢慢看,别着急,收获大大的)
- NodeJs的相关文章
- elance相关文章
- UiTableView相关文章
- 关于Latent Dirichlet Allocation及Hierarchical LDA模型的必读文章和相关代码
- 前端测试相关文章
- MySql相关文章
- 读“卓越科技”相关文章的一点心得
- UML建模相关文章收集
- big data相关的技术文章
- Android相关问题的好文章整理
- JAVA相关文章索引(6)
- NuGet相关的文章
- ES6相关文章
- Hive相关文章