Trident exactly once实现原理
2017-07-23 09:48
441 查看
为了实现exactly-once,storm0.7.0开始支持transactional toplogy(事务topology),也是微批处理架构,但目前已经不再维护(基本没有人用),功能完全被trident所替代。准确的说,trident topology是从transactional topology的基本上发展而来,包括spout和state都延用的transactional topology的思路,最大的改变就是抽象出了stream的概念。
实现exactly-once的关键在于状态的保存,就trident而言,包括:
元数据的保存,数据处理失败时,能知道要重发是哪些数据
中间状态的保存,重发数据时,能敏锐发现数据的状态是否已经更新过
trident只在用state来做中间状态保存的地方确保exactly once,而数据流并不一定要在所有的地方都需要用state,以wordcount为例,最终我们关心的只是单词的统计结果,中间的read和split并不需要关心,也就不需要保存state,因此,选择state的保存时机是重点。
一个txid对应一个batch,如果一个batch被重发,txid不变
任意两个batch中不会有tuple相同;
每个tuple都会被放到一个batch中,不会有tuple被漏掉
事务spout的实现是假设消息中间件是可靠的,但如果在重发一个batch时,正好batch中tuple所以某个分区失效,则会导致spout一直卡住,因为为了保证batch完全一致,会一直去尝试读取去tuple。为了解决这个缺陷,所以有了Opaque Transactional spouts。
Opaque Transactional spouts,也叫不透明事务spout,提供了弱一致性,即每个tuple只在一个batch中被成功处理,但不保证同一个txid对应的batch完全一致。当重新去读取一个batch的tuple时,不会因为读取不到某个tuple而卡住。
No-Transactional spouts ,非事务性spout,不保证对一致性
Transactional state ,保存txid和value
Opaque transactional state ,保存txid、value、preValue
Non-transactional state ,不保证exactly once
MasterBatchCoordinator(MBC):主协调者,Coordinator的执行器之一,也是实际的spout,负责的流的管理,以及控制新事务的产生
TridentSpoutCoordinator(TSC):另一个协调者,也是Coordinator的执行器之一,实际为bolt,主要负责元数据的封装
TridentSpoutExecutor(TSE):消息的发送者,Emitter的执行器,从数据源里根据TSC发来的元数据读出实际数据,发送出去
通过下图可以更清楚看到三个执行器的关系:
可以增加并发,当然也可以限制batch产生的速度。在batch数小于topology.max.spout.pending的情况下,MBC至少会等待trident batch interval的时间(默认是500ms)才会产生一个新的batch,关于这个参数,官方建议设置为正常的端到端处理时间的一半左右 —— 也就是说如果需要花费 600 ms 的时间处理一个batch,那么就可以每将此参数设置为300ms
主要分析一下Transactional state和Opaque transactional state 的实现原理,都是通过txid来判断数据是否已经处理过,不同之处在于,当txid为重发数据时,Transactional state直接忽略此次value更新,而Opaque transactional state是将上次处理的值与重发后的值进行combina后更新value,以wordcount为例,分别说明两者的存储过程。
假如正在处理的batch的txid=3,包含tuple为:
库中已存入如下结果:
库中单词“man”的txid是1,但当前的txid是3,所以可以确定当前batch中的“man”还没有更新过,可以放心的给count加2并更新txid为3.
与此同时,库中单词“dog”的txid和当前的txid是相同的,表明当前batch中的”dog”已经更新过,因此要可以跳过这次更新。此次更新后,数据库中的数据如下:
以上是Transactional state状态更新过程,前提是每个batch重发时,所包含的tuple都是一致的,但如果这个batch在重发的过程中,读取消息中间件时,某些区分也失效,则batch可能并不完整Transactional state不能保证exactly once,但opaque transactional spout可以。
使用opaque transactional state存储时,库中除了存储value和txid以外,还会存preValue(上次处理后的值),以更新单词man为例,如设man在库中已经存储如下:
若新的batch的txid为4,单词man出现3次,则正常更新,将value的值覆盖到preValue,同时value加上新增的值,更新后如下:
若新的batch的txid为3,单词man出现3次,则表示此txid之前已经处理过,但不能确认单词man出现的次数和上次是否一致,更新时,preValue的值不变,value的值更新成preValue + 3,更新后的结果为:
数据流对比
原理简介
trident中将tuple封装成batch,每一个batch提供一个唯一的txid,数据的发送、提交、重发都是基于txid,以batch为单位进行。实现exactly-once的关键在于状态的保存,就trident而言,包括:
元数据的保存,数据处理失败时,能知道要重发是哪些数据
中间状态的保存,重发数据时,能敏锐发现数据的状态是否已经更新过
trident只在用state来做中间状态保存的地方确保exactly once,而数据流并不一定要在所有的地方都需要用state,以wordcount为例,最终我们关心的只是单词的统计结果,中间的read和split并不需要关心,也就不需要保存state,因此,选择state的保存时机是重点。
三类spout
Transactional spouts,也叫事务spout,提供了强一致性,有如下三点保障:一个txid对应一个batch,如果一个batch被重发,txid不变
任意两个batch中不会有tuple相同;
每个tuple都会被放到一个batch中,不会有tuple被漏掉
事务spout的实现是假设消息中间件是可靠的,但如果在重发一个batch时,正好batch中tuple所以某个分区失效,则会导致spout一直卡住,因为为了保证batch完全一致,会一直去尝试读取去tuple。为了解决这个缺陷,所以有了Opaque Transactional spouts。
Opaque Transactional spouts,也叫不透明事务spout,提供了弱一致性,即每个tuple只在一个batch中被成功处理,但不保证同一个txid对应的batch完全一致。当重新去读取一个batch的tuple时,不会因为读取不到某个tuple而卡住。
No-Transactional spouts ,非事务性spout,不保证对一致性
三类state
就以上三种spout,分别有三种state来做状态保存Transactional state ,保存txid和value
Opaque transactional state ,保存txid、value、preValue
Non-transactional state ,不保证exactly once
spout和state的组合
yes表示可以实现exactly-once的组合Trident spout
trident spout实际是一个简单的topology结构,spout包含两个内部接口:Coordinator(协调者)和Emitter(消息发送者),这两组接口将由三个执行器来执行MasterBatchCoordinator(MBC):主协调者,Coordinator的执行器之一,也是实际的spout,负责的流的管理,以及控制新事务的产生
TridentSpoutCoordinator(TSC):另一个协调者,也是Coordinator的执行器之一,实际为bolt,主要负责元数据的封装
TridentSpoutExecutor(TSE):消息的发送者,Emitter的执行器,从数据源里根据TSC发来的元数据读出实际数据,发送出去
通过下图可以更清楚看到三个执行器的关系:
batch大小不能精确控制
一个batch中tuple的数量并不能直接做到精确控制,主要受数据量的影响,也可以通过配置topology.max.spout.pending的值(默认是1),来增加并发。可以增加并发,当然也可以限制batch产生的速度。在batch数小于topology.max.spout.pending的情况下,MBC至少会等待trident batch interval的时间(默认是500ms)才会产生一个新的batch,关于这个参数,官方建议设置为正常的端到端处理时间的一半左右 —— 也就是说如果需要花费 600 ms 的时间处理一个batch,那么就可以每将此参数设置为300ms
state状态存储与更新
Trident 会按照txid的大小来顺序更新 batch 的状态,也就是说txid=3的batch必然在txid=2的batch之后进行更新。主要分析一下Transactional state和Opaque transactional state 的实现原理,都是通过txid来判断数据是否已经处理过,不同之处在于,当txid为重发数据时,Transactional state直接忽略此次value更新,而Opaque transactional state是将上次处理的值与重发后的值进行combina后更新value,以wordcount为例,分别说明两者的存储过程。
假如正在处理的batch的txid=3,包含tuple为:
["man"]["man"]["dog"]
库中已存入如下结果:
man => [value=3, txid=1] dog => [value=4, txid=3] apple => [value=10, txid=2]
库中单词“man”的txid是1,但当前的txid是3,所以可以确定当前batch中的“man”还没有更新过,可以放心的给count加2并更新txid为3.
与此同时,库中单词“dog”的txid和当前的txid是相同的,表明当前batch中的”dog”已经更新过,因此要可以跳过这次更新。此次更新后,数据库中的数据如下:
man => [value=5, txid=3] dog => [value=4, txid=3] apple => [value=10, txid=2]
以上是Transactional state状态更新过程,前提是每个batch重发时,所包含的tuple都是一致的,但如果这个batch在重发的过程中,读取消息中间件时,某些区分也失效,则batch可能并不完整Transactional state不能保证exactly once,但opaque transactional spout可以。
使用opaque transactional state存储时,库中除了存储value和txid以外,还会存preValue(上次处理后的值),以更新单词man为例,如设man在库中已经存储如下:
man => [value=5,preValue=3, txid=3]
若新的batch的txid为4,单词man出现3次,则正常更新,将value的值覆盖到preValue,同时value加上新增的值,更新后如下:
man => [value=8,preValue=5, txid=4]
若新的batch的txid为3,单词man出现3次,则表示此txid之前已经处理过,但不能确认单词man出现的次数和上次是否一致,更新时,preValue的值不变,value的值更新成preValue + 3,更新后的结果为:
man => [value=6,preValue=3, txid=3]
相关文章推荐
- kafka exactly once 的实现原理解析
- SparkStreaming实现Exactly-Once语义
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(英文)
- SparkStreaming实现Exactly-Once语义
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(中文)
- Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(官方DEMO)
- Spark Streaming 中如何实现 Exactly-Once 语义
- Spark Streaming中如何实现Exactly-Once
- Spark Streaming exactly once原理及编程示例
- 单点登录原理与简单实现
- 微信小程序weapp的底层实现原理
- 网站统计中的数据收集原理及实现
- Java 基于红黑树的TreeMap,TreeSet实现原理
- jquery 实现原理五:ajax
- [p2p]UDP用打洞技术穿透NAT的原理与实现
- 详细解释如何通过Android自带的方式来实现图片的裁剪——原理分析+解决方案
- 堆排序算法原理及实现
- 构造HTTP请求Header实现“伪造来源IP”(重在原理)
- 自解压的jar实现原理