Twitter Storm源代码分析之acker工作流程
2013-12-20 10:25
309 查看
Twitter Storm源代码分析之acker工作流程
发表于 2011年 12 月 30 日 由 xumingming
作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/410/twitter-storm-code-analysis-acker-merchanism/
概述
我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指:一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。
也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看Twitter
Storm如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm里面有个专门的acker来跟踪所有tuple的完成情况。这篇文章就来讨论acker的详细工作流程。
源代码列表
这篇文章涉及到的源代码主要包括:backtype.storm.daemon.acker
backtype.storm.daemon.task
backtype.storm.task.OutputCollectorImpl
算法简介
acker对于tuple的跟踪算法是storm的主要突破之一, 这个算法使得对于任意大的一个tuple树, 它只需要恒定的20字节就可以进行跟踪了。原理很简单:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了,那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。
进入正题
那么下面我们从源代码层面来看看哪些组件在哪些时候会给acker发送什么样的消息来共同完成这个算法的。acker对消息进行处理的主要是下面这块代码:帮助
Spout创建一个新的tuple的时候给acker发送消息
消息格式(看上面代码的第1行和第7行对于tuple.getValue()的调用)
帮助
__ack_init(ACKER-INIT-STREAM-ID)
这是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会用来通知这个task:你的tuple处理成功了/失败了)。处理完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:
帮助
Bolt发射一个新tuple的时候会给acker发送消息么?
任何一个bolt在发射一个新的tuple的时候,是不会直接通知acker的,如果这样做的话那么每发射一个消息会有三条消息了:Bolt创建这个tuple的时候,把它发给下一个bolt的消息
Bolt创建这个tuple的时候,发送给acker的消息
ack tuple的时候发送的ack消息
事实上storm里面只有第一条和第三条消息,它把第二条消息省掉了, 怎么做到的呢?storm这点做得挺巧妙的,bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来。然后在ack每个tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。这样就给每个tuple省掉了一个消息(具体看下一节)。
Tuple被ack的时候给acker发送消息
每个tuple在被ack的时候,会给acker发送一个消息,消息格式是:帮助
__ack_ack(ACKER-ACK-STREAM-ID)
注意,这里的tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果:
帮助
帮助
generated-ids参数就是这个input-tuple的所有子tuple的id, 从代码可以看出storm会给这个tuple的每一个spout-tuple发送一个ack消息。
为什么说这里的
generated-ids是input-tuple的子tuple呢? 这个send-ack是被OutputCollectorImpl里面的ack方法调用的:
帮助
getExistingOutput(input)方法计算出来的, 我们再来看看这个方法的定义:
帮助
_pendingAcks里面存的是什么东西呢?
帮助
_pendingAcks里面维护的其实就是tuple到自己儿子的对应关系。
Tuple处理失败的时候会给acker发送失败消息
acker会忽略这种消息的消息内容(消息的streamId为ACKER-FAIL-STREAM-ID), 直接将对应的spout-tuple标记为失败(最上面代码第9行)
最后Acker发消息通知spout-tuple对应的Worker
最后, acker会根据上面这些消息的处理结果来通知这个spout-tuple对应的task:帮助
相关文章推荐
- Storm配置项详解
- Twitter Storm 安装篇
- storm 删数据后上传topology无法启动?
- TowerMadness之Brewing Storm攻略 Blizzardgale
- Storm框架使用详解 搭建篇
- Twitter Storm源代码分析之TimeCacheMap
- Twitter Storm: storm的一些常见模式
- Twitter Storm: 在生产集群上运行topology
- Twitter Storm的一些关键概念
- Twitter Storm如何保证消息不丢失
- Twitter Storm入门
- Twitter Storm: 配置开发环境
- Twitter Storm: 创建一个新的storm项目
- Twitter Storm: 本地模式简介
- Twitter Storm: Maven配置
- storm 配置项
- storm常见问题及解决方法收集【持续更新中】
- Storm集群安装部署步骤【详细版】
- Twitter Storm 安装实战
- Storm安装配置(单机版)笔记