您的位置:首页 > 运维架构 > Apache

Apache Beam的分窗与触发器

2017-09-14 20:07 302 查看
本文参考Apache Beam官方编程手册  

可以结合官方的Mobile Game 代码阅读本文。

在默认情况下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同时也不设置自定义的触发器,那么Beam会在所有数据都收集到之后才开始对数据进行处理。这通常只能适用于有限数据且对实时性要求不高的情况。当输入为无限流数据,我们可以

1)设置合适的窗口大小(根据时间戳),在窗口末端进行数据处理;

2)设置触发器,当条件满足时触发,进行数据处理;

3)同时设置窗口和触发器。

时间戳说明:Beam的数据都是保存在PCollection中。当读入数据时,PCollection为每个元素都自动生成一个内置的时间戳,对于无限输入,数据的时间戳不同。而对于有限输入,由于是同时读入,所有的元素的时间戳都是一样的,这时候分窗是没有意义的(都在一个窗口)。而我们可以手动为每个元素设置时间戳,通常采用数据中已有的时间属性(比如日志中一般都会带有事件时间)。可以在DoFn中为数据带上时间戳,如:

@ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(XXX));
}

窗口类型:

1)全局窗口

就是默认不分窗的情况。apply(Windows.<TYPE>into(new GlobalWindows()));

2)固定时间大小窗口

最常见的分窗方式,按照时间戳把数据处理窗口分为固定长度。apply(Windows.<TYPE>into(FixedWindows.of(Duration.standardMinutes(XX))))



3)滑动窗口

需要设置2个参数,窗口大小和窗口产生周期。窗口之间有重叠,通常用于计算平均数的情况(暂没用过)



4)会话窗口

一般用于相同key数据聚合,同一个key的数据之间时间间隔较大的会被分到不同的窗口。



水位线和超时数据:

当使用用户自定义的时间戳时,先处理的数据并不总是时间戳较小的,有可能出现时间戳小的数据在后面才产生的情况。Beam通常会给窗口设定一个处理期限时间(图中纵轴),当超过这个时间的数据被视为超时数据,而这些期限时间的连线即水位线。



系统会根据实际情况进行预测生成水位线,在默认情况下不对超时数据进行处理,而我们可以通过设置触发器对超时数据进行额外处理。

触发器种类

1)时间时间触发器

根据时间戳进行触发。

.triggering(AfterWatermark.pastEndOfWindow()//水位线到达时触发一次

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))//水位线之前,每次触发后第一个数据来到之后的5分钟时再触发

.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))//水位线之后,每次触发后第一个数据来到之后的10分钟时再触发


以上分别对水位线上中下的3种数据进行不同的处理。需要注意的是withEarlyFirings和withLateFirings方法生成的触发器是连续的而不是一次性的。

2)处理时间触发器

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)

如方法的字面意思,仅在第一个数据到达后的5分钟时触发一次。

3)数据驱动型触发器

AfterPane.elementCountAtleast(XX)

当处理到XX个时触发一次。需要注意的是当数据个数小于XX时永远不会触发数据处理。

4)混合触发器

将多个触发器混合起来,比如1)中的代码就是3个触发器混合。其他的还有

①Repeatedly.forever(一次性触发器)

将一次性触发器变为连续型触发器,触发后再次等待触发。例如与AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以实现每个数据到达后的5分钟进行处理,经常用于全局窗口,可以用orFinally(触发器)来设置停止条件。

②AfterEach.inOrder(触发器1,触发器2...)

当触发器1满足后等待触发器2...知道所有触发器满足后开始数据处理。

③AfterFirst(触发器1,触发器2..)和AfterAll(触发器1,触发器2..)

这2个分别为或,与的逻辑。

④orFinally

见①

处理方式(官方文档解释的很清楚了)


Accumulating Mode


If our trigger is set to 
.accumulatingFiredPanes
, the trigger
emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:

First trigger firing:  [5, 8, 3]
Second trigger firing: [5, 8, 3, 15, 19, 23]
Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]



Discarding Mode


If our trigger is set to 
.discardingFiredPanes
, the trigger
emits the following values on each firing:

First trigger firing:  [5, 8, 3]
Second trigger firing:           [15, 19, 23]
Third trigger firing:                         [9, 13, 10]


超时数据处理

.withAllowedLateness(Duration.XXXX(XXX))

可设置允许超时多长时间的数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Apache Beam