您的位置:首页 > 其它

Storm Transactional Batch Process原理

2014-01-11 20:17 295 查看
Storm中实现实时批处理的模块由IBatchBolt, BatchBoltExecutor, CoordinatedBolt等数据结构实现,它们之间的关系如图所示:



Storm实现事物批处理是通过在普通的IBatchBolt外面包装一层CoordinatedBolt,通过CoordinatedBolt实现批次和批次之间的事务性协调,其中记录批次元数据的神秘数据结构就是TrackingInfo, 具体定义如下:

public static class TrackingInfo {
// 记录该bolt针对该批次已经收到了多少个上游节点发送的coord tuple(即有多少个上游节点报告已经发完了本批数据).
int reportCount = 0;

// 记录该bolt针对该批次预期应该接收到的regular tuple(非控制tuple)总数. 该值是该bolt的所有上游节点通过coord stream发送的tuple中获得.
int expectedTupleCount = 0;

// 记录该批次已经接收到的regular tuple(非控制tuple)总量.
int receivedTuples = 0;

// 记录该bolt向下游task已发送过的tuple数量. key为taskId, value为已发送tuple总数.
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();

// 对于非committer bolt, 标识该bolt是否已经接收到了tuple;
// 对于committer bolt, 标识该committer bolt是否已经接收到了master bolt针对该batch发来的commit tuple, 将向下游发送coord tuple的时间推迟到接收到commit tuple后,而不是完成对于本批次的处理.
boolean receivedId = false;

// 在CoordinatedOutputCollector的ack中被置为true, 标识该批次中是否有tuple处理失败.
boolean failed = false;

// 标识该批次包含的所有tuple都已在该bolt中处理完成,该bolt对于该批次的元数据可以被删除.
boolean finished = false;

// 记录上游或者master bolt发送过来的coord tuple与commit tuple. 在checkFinishId调用时逐一进行ack.
List<Tuple> ackTuples = new ArrayList<Tuple>();
}


对于事务型批处理而言,无非要解决的核心问题是知道什么时候一个批次处理完成,成功完成后续该如何处理,失败了又该如何处理。

在CoordinatedBolt的execute会在接收到tuple时被调用。

public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;

// 首先判断接收到的tuple类型
TupleType type = getTupleType(tuple);
synchronized(_tracked) {

// 根据batchId获得对应的Batch元数据BatchInfo.
track = _tracked.get(id);
if(track==null) {
track = new TrackingInfo();

// _idStreamSpec非空, 表示该bolt是一个committer bolt.
// 如果不是committer bolt, 在第一次接收到tuple后,将batch的receivedId标志位置为true, 在后面执行checkFinishId时, 会将receivedId是否为true来作为bolt对一个批次是否处理完成条件之一.
if(_idStreamSpec==null) track.receivedId = true;
_tracked.put(id, track);
}
}

// 如果接收到控制流tuple, commit tuple(committer bolt)或者coord tuple(非committer bolt),则调用checkFinishId检查bolt是否完成了本批次处理.
// 对于committer bolt, 只有接收到了commit tuple后,才会将receiveId置为true.
if(type==TupleType.ID) {
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
// 每次接收到上游发来的coord tuple, 说明上游已经完成了本批次tuple处理, 从tuple中获取上游告知的本批次已发送tuple总数, 并进行相应元数据更新(该上游已经报备过).
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
synchronized(_tracked) {
_delegate.execute(tuple);
}
}
}


// 判断收到的tuple类型
private TupleType getTupleType(Tuple tuple) {
// committer tuple
if(_idStreamSpec!=null
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
return TupleType.ID;
// coord tuple
} else if(!_sourceArgs.isEmpty()
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
return TupleType.COORD;
// data tuple
} else {
return TupleType.REGULAR;
}
}


上述逻辑解决了预期的问题,即我这个bolt对于该批次预期应该收到多少条数据。另一个就是实际收到了多少数据。

public class CoordinatedOutputCollector implements IOutputCollector {
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);

// 每一条tuple消息被ack的时候,会更新实际收到并处理的tuple总数.
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
if(failed) {
_delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
}
}


最后就是将实际收到的数据量和预期进行对比,看看本批次是否已经完成处理。

CoordinatedBolt的函数checkFinishId完成了这个工作。

private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;

synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
try {
if(track!=null) {
boolean delayed = false;
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
if(track.failed) {
failed = true;
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);

// 成功完成一个批次的条件:
// 1. 至少接收过一次tuple(对于非committer bolt可以是任意tuple, 对于committer bolt需要收到commit tuple后才向下游发送coord tuple)
// 2. 接收到了所有上游节点发来的coord tuple(所有上游节点都已通知自己已经处理完了本批次)
// 3. 已经接收到的tuple总数和所有上游宣称发送的数据总数一致
} else if(track.receivedId
&& (_sourceArgs.isEmpty() ||
track.reportCount==_numSourceReports &&
track.expectedTupleCount == track.receivedTuples)){
if(_delegate instanceof FinishedCallback) {
((FinishedCallback)_delegate).finishedId(id);
}
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}

// 向下游节点发送coord tuple消息,通知自己已经处理完了本批次以及发送给下游的数据量
Iterator<Integer> outTasks = _countOutTasks.iterator();
while(outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = get(track.taskEmittedTuples, task, 0);
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}

// 向上游发送ack tuple, 告知上游自己已经处理完成本批次
for(Tuple t: track.ackTuples) {
_collector.ack(t);
}
track.finished = true;
_tracked.remove(id);
}
if(!delayed && type!=TupleType.REGULAR) {
if(track.failed) {
_collector.fail(tup);
} else {
_collector.ack(tup);
}
}
} else {
if(type!=TupleType.REGULAR) _collector.fail(tup);
}
} catch(FailedException e) {
LOG.error("Failed to finish batch", e);
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
failed = true;
}
}
return failed;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: