您的位置:首页 > 其它

Trident exactly once实现原理

2017-07-23 09:48 441 查看
为了实现exactly-once,storm0.7.0开始支持transactional toplogy(事务topology),也是微批处理架构,但目前已经不再维护(基本没有人用),功能完全被trident所替代。准确的说,trident topology是从transactional topology的基本上发展而来,包括spout和state都延用的transactional topology的思路,最大的改变就是抽象出了stream的概念。

数据流对比



原理简介

trident中将tuple封装成batch,每一个batch提供一个唯一的txid,数据的发送、提交、重发都是基于txid,以batch为单位进行。

实现exactly-once的关键在于状态的保存,就trident而言,包括:

元数据的保存,数据处理失败时,能知道要重发是哪些数据

中间状态的保存,重发数据时,能敏锐发现数据的状态是否已经更新过

trident只在用state来做中间状态保存的地方确保exactly once,而数据流并不一定要在所有的地方都需要用state,以wordcount为例,最终我们关心的只是单词的统计结果,中间的read和split并不需要关心,也就不需要保存state,因此,选择state的保存时机是重点。

三类spout

Transactional spouts,也叫事务spout,提供了强一致性,有如下三点保障:

一个txid对应一个batch,如果一个batch被重发,txid不变

任意两个batch中不会有tuple相同;

每个tuple都会被放到一个batch中,不会有tuple被漏掉

事务spout的实现是假设消息中间件是可靠的,但如果在重发一个batch时,正好batch中tuple所以某个分区失效,则会导致spout一直卡住,因为为了保证batch完全一致,会一直去尝试读取去tuple。为了解决这个缺陷,所以有了Opaque Transactional spouts。

Opaque Transactional spouts,也叫不透明事务spout,提供了弱一致性,即每个tuple只在一个batch中被成功处理,但不保证同一个txid对应的batch完全一致。当重新去读取一个batch的tuple时,不会因为读取不到某个tuple而卡住。

No-Transactional spouts ,非事务性spout,不保证对一致性

三类state

就以上三种spout,分别有三种state来做状态保存

Transactional state ,保存txid和value

Opaque transactional state ,保存txid、value、preValue

Non-transactional state ,不保证exactly once

spout和state的组合

yes表示可以实现exactly-once的组合



Trident spout

trident spout实际是一个简单的topology结构,spout包含两个内部接口:Coordinator(协调者)和Emitter(消息发送者),这两组接口将由三个执行器来执行

MasterBatchCoordinator(MBC):主协调者,Coordinator的执行器之一,也是实际的spout,负责的流的管理,以及控制新事务的产生

TridentSpoutCoordinator(TSC):另一个协调者,也是Coordinator的执行器之一,实际为bolt,主要负责元数据的封装

TridentSpoutExecutor(TSE):消息的发送者,Emitter的执行器,从数据源里根据TSC发来的元数据读出实际数据,发送出去

通过下图可以更清楚看到三个执行器的关系:



batch大小不能精确控制

一个batch中tuple的数量并不能直接做到精确控制,主要受数据量的影响,也可以通过配置topology.max.spout.pending的值(默认是1),来增加并发。

可以增加并发,当然也可以限制batch产生的速度。在batch数小于topology.max.spout.pending的情况下,MBC至少会等待trident batch interval的时间(默认是500ms)才会产生一个新的batch,关于这个参数,官方建议设置为正常的端到端处理时间的一半左右 —— 也就是说如果需要花费 600 ms 的时间处理一个batch,那么就可以每将此参数设置为300ms

state状态存储与更新

Trident 会按照txid的大小来顺序更新 batch 的状态,也就是说txid=3的batch必然在txid=2的batch之后进行更新。

主要分析一下Transactional stateOpaque transactional state 的实现原理,都是通过txid来判断数据是否已经处理过,不同之处在于,当txid为重发数据时,Transactional state直接忽略此次value更新,而Opaque transactional state是将上次处理的值与重发后的值进行combina后更新value,以wordcount为例,分别说明两者的存储过程。

假如正在处理的batch的txid=3,包含tuple为:

["man"]["man"]["dog"]


库中已存入如下结果:

man => [value=3, txid=1]
dog => [value=4, txid=3]
apple => [value=10, txid=2]


库中单词“man”的txid是1,但当前的txid是3,所以可以确定当前batch中的“man”还没有更新过,可以放心的给count加2并更新txid为3.

与此同时,库中单词“dog”的txid和当前的txid是相同的,表明当前batch中的”dog”已经更新过,因此要可以跳过这次更新。此次更新后,数据库中的数据如下:

man => [value=5, txid=3]
dog => [value=4, txid=3]
apple => [value=10, txid=2]


以上是Transactional state状态更新过程,前提是每个batch重发时,所包含的tuple都是一致的,但如果这个batch在重发的过程中,读取消息中间件时,某些区分也失效,则batch可能并不完整Transactional state不能保证exactly once,但opaque transactional spout可以。

使用opaque transactional state存储时,库中除了存储value和txid以外,还会存preValue(上次处理后的值),以更新单词man为例,如设man在库中已经存储如下:

man => [value=5,preValue=3, txid=3]


若新的batch的txid为4,单词man出现3次,则正常更新,将value的值覆盖到preValue,同时value加上新增的值,更新后如下:

man => [value=8,preValue=5, txid=4]


若新的batch的txid为3,单词man出现3次,则表示此txid之前已经处理过,但不能确认单词man出现的次数和上次是否一致,更新时,preValue的值不变,value的值更新成preValue + 3,更新后的结果为:

man => [value=6,preValue=3, txid=3]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm 事务