Storm Transactional Batch Process原理
2014-01-11 20:17
295 查看
Storm中实现实时批处理的模块由IBatchBolt, BatchBoltExecutor, CoordinatedBolt等数据结构实现,它们之间的关系如图所示:
Storm实现事物批处理是通过在普通的IBatchBolt外面包装一层CoordinatedBolt,通过CoordinatedBolt实现批次和批次之间的事务性协调,其中记录批次元数据的神秘数据结构就是TrackingInfo, 具体定义如下:
对于事务型批处理而言,无非要解决的核心问题是知道什么时候一个批次处理完成,成功完成后续该如何处理,失败了又该如何处理。
在CoordinatedBolt的execute会在接收到tuple时被调用。
上述逻辑解决了预期的问题,即我这个bolt对于该批次预期应该收到多少条数据。另一个就是实际收到了多少数据。
最后就是将实际收到的数据量和预期进行对比,看看本批次是否已经完成处理。
CoordinatedBolt的函数checkFinishId完成了这个工作。
Storm实现事物批处理是通过在普通的IBatchBolt外面包装一层CoordinatedBolt,通过CoordinatedBolt实现批次和批次之间的事务性协调,其中记录批次元数据的神秘数据结构就是TrackingInfo, 具体定义如下:
public static class TrackingInfo { // 记录该bolt针对该批次已经收到了多少个上游节点发送的coord tuple(即有多少个上游节点报告已经发完了本批数据). int reportCount = 0; // 记录该bolt针对该批次预期应该接收到的regular tuple(非控制tuple)总数. 该值是该bolt的所有上游节点通过coord stream发送的tuple中获得. int expectedTupleCount = 0; // 记录该批次已经接收到的regular tuple(非控制tuple)总量. int receivedTuples = 0; // 记录该bolt向下游task已发送过的tuple数量. key为taskId, value为已发送tuple总数. Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>(); // 对于非committer bolt, 标识该bolt是否已经接收到了tuple; // 对于committer bolt, 标识该committer bolt是否已经接收到了master bolt针对该batch发来的commit tuple, 将向下游发送coord tuple的时间推迟到接收到commit tuple后,而不是完成对于本批次的处理. boolean receivedId = false; // 在CoordinatedOutputCollector的ack中被置为true, 标识该批次中是否有tuple处理失败. boolean failed = false; // 标识该批次包含的所有tuple都已在该bolt中处理完成,该bolt对于该批次的元数据可以被删除. boolean finished = false; // 记录上游或者master bolt发送过来的coord tuple与commit tuple. 在checkFinishId调用时逐一进行ack. List<Tuple> ackTuples = new ArrayList<Tuple>(); }
对于事务型批处理而言,无非要解决的核心问题是知道什么时候一个批次处理完成,成功完成后续该如何处理,失败了又该如何处理。
在CoordinatedBolt的execute会在接收到tuple时被调用。
public void execute(Tuple tuple) { Object id = tuple.getValue(0); TrackingInfo track; // 首先判断接收到的tuple类型 TupleType type = getTupleType(tuple); synchronized(_tracked) { // 根据batchId获得对应的Batch元数据BatchInfo. track = _tracked.get(id); if(track==null) { track = new TrackingInfo(); // _idStreamSpec非空, 表示该bolt是一个committer bolt. // 如果不是committer bolt, 在第一次接收到tuple后,将batch的receivedId标志位置为true, 在后面执行checkFinishId时, 会将receivedId是否为true来作为bolt对一个批次是否处理完成条件之一. if(_idStreamSpec==null) track.receivedId = true; _tracked.put(id, track); } } // 如果接收到控制流tuple, commit tuple(committer bolt)或者coord tuple(非committer bolt),则调用checkFinishId检查bolt是否完成了本批次处理. // 对于committer bolt, 只有接收到了commit tuple后,才会将receiveId置为true. if(type==TupleType.ID) { synchronized(_tracked) { track.receivedId = true; } checkFinishId(tuple, type); } else if(type==TupleType.COORD) { // 每次接收到上游发来的coord tuple, 说明上游已经完成了本批次tuple处理, 从tuple中获取上游告知的本批次已发送tuple总数, 并进行相应元数据更新(该上游已经报备过). int count = (Integer) tuple.getValue(1); synchronized(_tracked) { track.reportCount++; track.expectedTupleCount+=count; } checkFinishId(tuple, type); } else { synchronized(_tracked) { _delegate.execute(tuple); } } }
// 判断收到的tuple类型 private TupleType getTupleType(Tuple tuple) { // committer tuple if(_idStreamSpec!=null && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) { return TupleType.ID; // coord tuple } else if(!_sourceArgs.isEmpty() && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) { return TupleType.COORD; // data tuple } else { return TupleType.REGULAR; } }
上述逻辑解决了预期的问题,即我这个bolt对于该批次预期应该收到多少条数据。另一个就是实际收到了多少数据。
public class CoordinatedOutputCollector implements IOutputCollector { public void ack(Tuple tuple) { Object id = tuple.getValue(0); synchronized(_tracked) { TrackingInfo track = _tracked.get(id); // 每一条tuple消息被ack的时候,会更新实际收到并处理的tuple总数. if (track != null) track.receivedTuples++; } boolean failed = checkFinishId(tuple, TupleType.REGULAR); if(failed) { _delegate.fail(tuple); } else { _delegate.ack(tuple); } } }
最后就是将实际收到的数据量和预期进行对比,看看本批次是否已经完成处理。
CoordinatedBolt的函数checkFinishId完成了这个工作。
private boolean checkFinishId(Tuple tup, TupleType type) { Object id = tup.getValue(0); boolean failed = false; synchronized(_tracked) { TrackingInfo track = _tracked.get(id); try { if(track!=null) { boolean delayed = false; if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) { track.ackTuples.add(tup); delayed = true; } if(track.failed) { failed = true; for(Tuple t: track.ackTuples) { _collector.fail(t); } _tracked.remove(id); // 成功完成一个批次的条件: // 1. 至少接收过一次tuple(对于非committer bolt可以是任意tuple, 对于committer bolt需要收到commit tuple后才向下游发送coord tuple) // 2. 接收到了所有上游节点发来的coord tuple(所有上游节点都已通知自己已经处理完了本批次) // 3. 已经接收到的tuple总数和所有上游宣称发送的数据总数一致 } else if(track.receivedId && (_sourceArgs.isEmpty() || track.reportCount==_numSourceReports && track.expectedTupleCount == track.receivedTuples)){ if(_delegate instanceof FinishedCallback) { ((FinishedCallback)_delegate).finishedId(id); } if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) { throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible"); } // 向下游节点发送coord tuple消息,通知自己已经处理完了本批次以及发送给下游的数据量 Iterator<Integer> outTasks = _countOutTasks.iterator(); while(outTasks.hasNext()) { int task = outTasks.next(); int numTuples = get(track.taskEmittedTuples, task, 0); _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples)); } // 向上游发送ack tuple, 告知上游自己已经处理完成本批次 for(Tuple t: track.ackTuples) { _collector.ack(t); } track.finished = true; _tracked.remove(id); } if(!delayed && type!=TupleType.REGULAR) { if(track.failed) { _collector.fail(tup); } else { _collector.ack(tup); } } } else { if(type!=TupleType.REGULAR) _collector.fail(tup); } } catch(FailedException e) { LOG.error("Failed to finish batch", e); for(Tuple t: track.ackTuples) { _collector.fail(t); } _tracked.remove(id); failed = true; } } return failed; }
相关文章推荐
- Storm 多语言支持之ShellBolt原理及改进
- [置顶] Storm运行原理探索
- SPWeb.ProcessBatchData Method 的应用
- FW:分布式实时计算storm&nbsp;原理…
- 操作系统原理学习笔记(2)之进程概念(关键词:操作系统原理学习笔记、进程、process)
- Storm原理
- 二、storm任务提交原理
- Storm运行原理探索
- Storm概念、原理详解及其应用(一)BaseStorm
- FW:分布式实时计算storm&nbsp;原理…
- Storm原理及单机安装指南
- 五分钟学会写storm代码: jstorm/storm编码原理与普通java程序的区别
- storm 入门原理介绍_AboutYUN
- storm 1.0版本滑动窗口的实现及原理
- 理解storm的ACKER机制原理
- Storm——可靠性(ACK原理)
- storm 1.0版本滑动窗口的实现及原理
- SPWeb.ProcessBatchData Method
- apache storm简介与原理
- Storm简介、原理架构+集群搭建+实例设计分析+性能优化