storm的ack和fail为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用s
2016-11-25 20:13
579 查看
原文链接:http://macrochen.iteye.com/blog/1414568
为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用spout的ack方法, 如果失败, 会调用fail方法. 而在处理tuple的每一个bolt都会通过OutputCollector来告知storm,
当前bolt处理是否成功. 为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系, 我调试跟踪了一下storm代码.
IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为. 所以IBasicBolt用来做filter或者简单的计算比较合适.
可以参考BasicBoltExecutor代码里面的实现就可以明白了:
Java代码
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
LOG.warn("Failed to process tuple", e);
_collector.getOutputter().fail(input);
}
}
在IRichBolt实现类中, 如果OutputCollector.emit(oldTuple, newTuple)这样调用来发射tuple(在storm中称之为anchoring), 那么后面的bolt的ack/fail会影响spout的ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring),
则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略
中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发.
另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录, 因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple,
然后在ack/fail中对该tuple进行处理.
这里有个问题, 就是每个bolt执行完之后要显式的调用ack/fail, 否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候, 为什么不将bolt的ack设置为默认调用
参考文档:https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用spout的ack方法, 如果失败, 会调用fail方法. 而在处理tuple的每一个bolt都会通过OutputCollector来告知storm,
当前bolt处理是否成功. 为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系, 我调试跟踪了一下storm代码.
IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为. 所以IBasicBolt用来做filter或者简单的计算比较合适.
可以参考BasicBoltExecutor代码里面的实现就可以明白了:
Java代码
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
LOG.warn("Failed to process tuple", e);
_collector.getOutputter().fail(input);
}
}
在IRichBolt实现类中, 如果OutputCollector.emit(oldTuple, newTuple)这样调用来发射tuple(在storm中称之为anchoring), 那么后面的bolt的ack/fail会影响spout的ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring),
则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略
中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发.
另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录, 因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple,
然后在ack/fail中对该tuple进行处理.
这里有个问题, 就是每个bolt执行完之后要显式的调用ack/fail, 否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候, 为什么不将bolt的ack设置为默认调用
参考文档:https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
相关文章推荐
- 第六题:航天飞行器是一项复杂而又精密的仪器,飞行器的损耗主要集中在发射和降落的过程,科学家根据实验数据估计,如果在发射过程中,产生了 x 程度的损耗,那么在降落的过程中就会产生 x2 程度的损耗,如果飞船的总损耗超过了它的耐久度,飞行器就会爆炸坠毁。问一艘耐久度为 h 的飞行器,假设在飞行过程中不产生损耗,那么为了保证其可以安全的到达目的地,只考虑整数解,至多发射过程中可以承受多少程度的损耗?
- 对于web api 从页面post数据到web api,如果用ajax,则可以成功上传到服务器,需要一个类来存储那些变量
- 黑马程序员之C#编程基础学习笔记:将一个整数数组的每一个元素进行如下的处理:如果元素是正数则将这个位置的元素的值加1,如果元素是负数则将这个位置的元素减1。
- load data 方式导入的数据不可以用binlog日志进行恢复,因为binlog里面不产生insert sql语句。
- 在非英文字符集的页面上,如果使用Ajax方式进行数据交互的话,就必须要注意保证前后端数据的统一编码,否则,很容易就出现乱码!
- 堆的数据结构能够使得堆顶总是维持最大(对于大根堆)或最小(对于小根堆),给定一个数组,对这个数组进行建堆,则平均复杂度是多少?如果只是用堆的 push 操作,则一个大根堆依次输入 3,7,2,4,1,5,8 后,得到的堆的结构示意图是下述图表中的哪个?
- 1.给出一个Person类里面包含姓名、年龄、成绩,声明5个Person对象数组,要求对数组中的内容进行排序,排序规则如下:按成绩由高到低排序,如果成绩一样,按年龄由高到低排序。
- Tuxedo是BEA公司(现已被Oracle公司收购)的一个客户机/服务器的“中间件”产品,它在客户机和服务器之间进行调节,以保证正确地处理事务。它用C语言技术开发的并且有很高性能。
- 个有10个元素的整型一维数组,用户输入9个数据,调用函数,对数组元素进行从小到大排序后,在函数中输入一个数,插入到数组中正确的位置,并输出
- 将一个list进行分页处理数据
- 编写一个与dup2功能相同的函数,要求不调用fcntl函数,并且要有正确的出错处理
- Android Handler机制 (一个Thead中可以建立多个Hander,通过msg.target保证MessageQueue中的每个msg交由发送message的handler进行处理 ,但是
- (STL中自带的排序功能7.3.3)POJ 1318 Word Amalgamation(求解一个单词是否在字典里面。解法:将单词按字典序处理后的结果与字典中的单词安字典序处理后的结果进行比较)
- EasyUI-datagrid 对于展示数据进行处理(formatter)
- 在使用Linq的过程中,如果要进行数据的比较和处理,请记住使用ToList()方法。
- Problem Description 输入n(n<100)个数,找出其中最小的数,将它与最前面的数交换后输出这些数。 Input 输入数据有多组,每组占一行,每行的开始是一个整数n,表示这个测试实例的数值的个数,跟着就是n个整数。n=0表示输入的结束,不做处理。 Output 对于每组
- JavaScript判断一个数组里面是否有重复数据(对jqgrid数据进行操作)
- //写一个生成10个100以内随机数数据,再进行冒泡排序,顺序,//二分查找法找到一个值得位置,如果没有则为-1,有则返回数组位置
- 对于top.ascx里面可以不可以放置css的文件进行一个讲解
- 从零开始搭建一个完善的MVP开发框架(三),对列表型数据请求进行抽象,优化列表型数据的处理