您的位置:首页 > 其它

Storm学习之Trident:笔记(一)聚合操作

2017-08-04 19:52 267 查看

trident

什么是trident,又跟storm有什么联系?trident是在storm上提供的了高层抽象,其抽象掉了事务处理和状态管理的细节。特别的,他可以让一批tuple进行 离散的事务处理,此外,trident还提供了抽象操作,允许topolopy在数据上执行函数功能、过滤、聚合操作。
要学习trident除了storm基础原理(其实没有也能理解)外,还主要学习trident topololy、trident spout、trident操作(filter和function)、trident 聚合(combiner和reducer)、trident state。虽然storm都能独立完成这些,但是trident能加速开发并且提供优化方案。

trident 聚合

这也是今天学习的,trident提供了五种不同的聚合方案:1、Apply Locally:作用在本地节点数据的本地聚合,此过程不会产生网络传输;2、Repartitioning:对数据流的重分布,改变流的流向,不改变流的内容,但是产生网络传输;3、Aggragation:聚合操作,产生网络传输;4、Group
stream:作用在分组流上的操作;5、merge和join

      这里主要说一下主要的聚合接口:Aggregator<T>、CombinerAggregator<T>、ReducerAggregator<T>,这里是扩展了泛型,也就是最后聚合后emit的数据结构,例如可以简单的是map,也可以是更复杂的数据结构。例如我的自定义Aggregator:
Aggregator<HashMap<String, LinkedList<HashMap<String, Double>>>>
需要特别注意的是:当使用没有group by 的Aggregator或者ReducerAggregation计算global aggretation时,每个batch的数据流只能在1个partition(相当于storm的task)
中执行,即使设置了parallelismHint的并发数n>1,实际上也只能轮循的叫不同批次aggregation执行,也就相当于串行执行,所以反而浪费了资源。

      还有值得注意的就是在自定义分组的grouping接口后使用global aggregator时,自定义的分组是不起作用的。还有就是当设置parallelismHint时,要参考group  by  key中key的规模,否则也会造成资源浪费,很多partition空跑。
       说一下partitionAggregate,经常用于global aggregate时的本地化聚合,就像mapreduce中map阶段会先进行本地group一样。在每个partition中独立的进行聚合操作,互不干涉,最后emit出去以后,通过网络传输提供给后面的partition进行全局聚合操做,这样就实现了global aggregate的并发。但是partitionAggregate前卖弄不能跟groupBy,因为groupBy方法返回的GroupedStream对象没有partitionAggregate方法。
在实现时候还要特别说明一下Aggregator<T>接口,要实现五个方法:
1、prepare:只在启动topolopy时调用1次,如果设置了并发度,则在每一个partition中调用一次;
2、cleanup:只在正常关闭topolopy时调用1次,如果设置了并发度,则在每一个partition中调用1次;
3、init:对于global aggregation来说,每个批次调用1次。如果使用的时partitionAggregate则每个批次的每一个partition调用一次。对于Group Streams来说,每个相同的key组成的数据流调用一次。需要注意的是,如果使用的是事务型的spout,同时某个批次处理失败导致该批次消息重新发送,则在接下来处理时,initu有可能调用多次,所以init里面代码逻辑要支持同一批的重复调用。
4、aggregate:每个tuple调用1次;
5、complete:对于global aggregation来说,每个批次调用一次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams来说,每个相同的key组成的数据流调用1次。

还有就是CombinerAggregator<T>,开始时说使用aggregation做global aggregation无法启动并发,但是当配合CombinerAggregator<T>时候可以:
trident.newStream(“TRIDENT_SPOUT”, new MySpout())
.parallelismHint(5)
.aggregate(new MyCombinerAggregator(), new Fields(“testoutput”));


      Trident会把拓扑自动拆分成2个bolt,第一个bolt做局部聚合,类似于Hadoop中的map;第二个bolt通过接收

网络传输过来的局部聚合值最后做一个全局聚合。或者还可以使用partitionAggregate<T>也可以实现同样功能:
trident.newStream(“trident_spout”, new MySpout())
.partitionAggregate(new MyAggregator(), new Fields(“testoutput1”))
.parallelismHint(5)
.aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“testoutput2”));


以下内容部分参考: 点击打开链接

有三点需要注意:

1、自动优化后的第一个bolt是本地化操作,因此它可以和它前面或者后面挨着的所有each合并在同一个bolt里面。

2、parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的

并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮

循而已。

3、综合1和2,把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实

现的spout类是否支持并发发送。

CombinerAggregator<T>需要实现3个方法:

init:每条tuple调用1次,对tuple做预处理。

combine:每条tuple调用1次,和之前的聚合值做combine。如果是第一条tuple则和zero返回的值做combine。

zero:当没有数据流时的处理逻辑。

整个CombinerAggregator<T>会在每批次结束时将combine的结果做一次emit。

要正确理解persisitentAggregate就要先理解Trident中的state,state也是使用storm Trident进行迭代计算的

基础,就相当于我们迭代时的全局变量,而且trident state支持查询,

        persistentAggregate是实现聚合的另外一种方式。前面介绍的聚合可以看成是对每个批次的数据做本批次内的聚

合计算,至于批次之间如何merge需要自己处理。而persistentAggregate可以看成是对源源不断发送过来数据流做

一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持

久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者

ReducerAggregator<T>。

如有错误,欢迎指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息