Apache Beam的分窗与触发器
2017-09-14 20:07
302 查看
本文参考Apache Beam官方编程手册
可以结合官方的Mobile Game 代码阅读本文。
在默认情况下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同时也不设置自定义的触发器,那么Beam会在所有数据都收集到之后才开始对数据进行处理。这通常只能适用于有限数据且对实时性要求不高的情况。当输入为无限流数据,我们可以
1)设置合适的窗口大小(根据时间戳),在窗口末端进行数据处理;
2)设置触发器,当条件满足时触发,进行数据处理;
3)同时设置窗口和触发器。
时间戳说明:Beam的数据都是保存在PCollection中。当读入数据时,PCollection为每个元素都自动生成一个内置的时间戳,对于无限输入,数据的时间戳不同。而对于有限输入,由于是同时读入,所有的元素的时间戳都是一样的,这时候分窗是没有意义的(都在一个窗口)。而我们可以手动为每个元素设置时间戳,通常采用数据中已有的时间属性(比如日志中一般都会带有事件时间)。可以在DoFn中为数据带上时间戳,如:
窗口类型:
1)全局窗口
就是默认不分窗的情况。apply(Windows.<TYPE>into(new GlobalWindows()));
2)固定时间大小窗口
最常见的分窗方式,按照时间戳把数据处理窗口分为固定长度。apply(Windows.<TYPE>into(FixedWindows.of(Duration.standardMinutes(XX))))
![](https://beam.apache.org/images/fixed-time-windows.png)
3)滑动窗口
需要设置2个参数,窗口大小和窗口产生周期。窗口之间有重叠,通常用于计算平均数的情况(暂没用过)
![](https://beam.apache.org/images/sliding-time-windows.png)
4)会话窗口
一般用于相同key数据聚合,同一个key的数据之间时间间隔较大的会被分到不同的窗口。
![](https://beam.apache.org/images/session-windows.png)
水位线和超时数据:
当使用用户自定义的时间戳时,先处理的数据并不总是时间戳较小的,有可能出现时间戳小的数据在后面才产生的情况。Beam通常会给窗口设定一个处理期限时间(图中纵轴),当超过这个时间的数据被视为超时数据,而这些期限时间的连线即水位线。
![](https://beam.apache.org/images/gaming-example-basic.png)
系统会根据实际情况进行预测生成水位线,在默认情况下不对超时数据进行处理,而我们可以通过设置触发器对超时数据进行额外处理。
触发器种类
1)时间时间触发器
根据时间戳进行触发。
以上分别对水位线上中下的3种数据进行不同的处理。需要注意的是withEarlyFirings和withLateFirings方法生成的触发器是连续的而不是一次性的。
2)处理时间触发器
如方法的字面意思,仅在第一个数据到达后的5分钟时触发一次。
3)数据驱动型触发器
当处理到XX个时触发一次。需要注意的是当数据个数小于XX时永远不会触发数据处理。
4)混合触发器
将多个触发器混合起来,比如1)中的代码就是3个触发器混合。其他的还有
①Repeatedly.forever(一次性触发器)
将一次性触发器变为连续型触发器,触发后再次等待触发。例如与AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以实现每个数据到达后的5分钟进行处理,经常用于全局窗口,可以用orFinally(触发器)来设置停止条件。
②AfterEach.inOrder(触发器1,触发器2...)
当触发器1满足后等待触发器2...知道所有触发器满足后开始数据处理。
③AfterFirst(触发器1,触发器2..)和AfterAll(触发器1,触发器2..)
这2个分别为或,与的逻辑。
④orFinally
见①
处理方式(官方文档解释的很清楚了)
Accumulating Mode
If our trigger is set to
emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
Discarding Mode
If our trigger is set to
emits the following values on each firing:
超时数据处理
.withAllowedLateness(Duration.XXXX(XXX))
可设置允许超时多长时间的数据。
可以结合官方的Mobile Game 代码阅读本文。
在默认情况下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同时也不设置自定义的触发器,那么Beam会在所有数据都收集到之后才开始对数据进行处理。这通常只能适用于有限数据且对实时性要求不高的情况。当输入为无限流数据,我们可以
1)设置合适的窗口大小(根据时间戳),在窗口末端进行数据处理;
2)设置触发器,当条件满足时触发,进行数据处理;
3)同时设置窗口和触发器。
时间戳说明:Beam的数据都是保存在PCollection中。当读入数据时,PCollection为每个元素都自动生成一个内置的时间戳,对于无限输入,数据的时间戳不同。而对于有限输入,由于是同时读入,所有的元素的时间戳都是一样的,这时候分窗是没有意义的(都在一个窗口)。而我们可以手动为每个元素设置时间戳,通常采用数据中已有的时间属性(比如日志中一般都会带有事件时间)。可以在DoFn中为数据带上时间戳,如:
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }
窗口类型:
1)全局窗口
就是默认不分窗的情况。apply(Windows.<TYPE>into(new GlobalWindows()));
2)固定时间大小窗口
最常见的分窗方式,按照时间戳把数据处理窗口分为固定长度。apply(Windows.<TYPE>into(FixedWindows.of(Duration.standardMinutes(XX))))
![](https://beam.apache.org/images/fixed-time-windows.png)
3)滑动窗口
需要设置2个参数,窗口大小和窗口产生周期。窗口之间有重叠,通常用于计算平均数的情况(暂没用过)
![](https://beam.apache.org/images/sliding-time-windows.png)
4)会话窗口
一般用于相同key数据聚合,同一个key的数据之间时间间隔较大的会被分到不同的窗口。
![](https://beam.apache.org/images/session-windows.png)
水位线和超时数据:
当使用用户自定义的时间戳时,先处理的数据并不总是时间戳较小的,有可能出现时间戳小的数据在后面才产生的情况。Beam通常会给窗口设定一个处理期限时间(图中纵轴),当超过这个时间的数据被视为超时数据,而这些期限时间的连线即水位线。
![](https://beam.apache.org/images/gaming-example-basic.png)
系统会根据实际情况进行预测生成水位线,在默认情况下不对超时数据进行处理,而我们可以通过设置触发器对超时数据进行额外处理。
触发器种类
1)时间时间触发器
根据时间戳进行触发。
.triggering(AfterWatermark.pastEndOfWindow()//水位线到达时触发一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位线之前,每次触发后第一个数据来到之后的5分钟时再触发 .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位线之后,每次触发后第一个数据来到之后的10分钟时再触发
以上分别对水位线上中下的3种数据进行不同的处理。需要注意的是withEarlyFirings和withLateFirings方法生成的触发器是连续的而不是一次性的。
2)处理时间触发器
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,仅在第一个数据到达后的5分钟时触发一次。
3)数据驱动型触发器
AfterPane.elementCountAtleast(XX)
当处理到XX个时触发一次。需要注意的是当数据个数小于XX时永远不会触发数据处理。
4)混合触发器
将多个触发器混合起来,比如1)中的代码就是3个触发器混合。其他的还有
①Repeatedly.forever(一次性触发器)
将一次性触发器变为连续型触发器,触发后再次等待触发。例如与AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以实现每个数据到达后的5分钟进行处理,经常用于全局窗口,可以用orFinally(触发器)来设置停止条件。
②AfterEach.inOrder(触发器1,触发器2...)
当触发器1满足后等待触发器2...知道所有触发器满足后开始数据处理。
③AfterFirst(触发器1,触发器2..)和AfterAll(触发器1,触发器2..)
这2个分别为或,与的逻辑。
④orFinally
见①
处理方式(官方文档解释的很清楚了)
Accumulating Mode
If our trigger is set to
.accumulatingFiredPanes, the trigger
emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode
If our trigger is set to
.discardingFiredPanes, the trigger
emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]
超时数据处理
.withAllowedLateness(Duration.XXXX(XXX))
可设置允许超时多长时间的数据。
相关文章推荐
- [置顶] Apache Beam核心—触发器规约
- Apache Beam的架构概览
- Apache Beam—透视Google统一流式计算的野心
- Apache Beam Java SDK 快速开始
- Apache Beam程序向导4
- Apache Beam的目标
- Apache Beam—透视Google统一流式计算的野心
- Why Apache Beam? A data Artisans perspective
- Apache Beam实战指南 | 手把手教你玩转KafkaIO与Flink
- Apache Beam处理Kafka数据源源码分析
- [置顶] Apache Beam简介
- Apache Beam—透视Google统一流式计算的野心
- Apache Beam中的有状态计算
- Apache Beam的基本概念
- Apache Beam Fn API如何接收和发送数据
- apache beam
- Apache Beam 综述
- Apache Beam中的几种常见的处理类
- Apache Beam是什么?
- Apache Beam的API设计