Flink流计算编程--Session Window实战
2016-09-28 18:56
549 查看
1、session window简介
Flink从1.1开始支持Session window,它是属于基于时间的窗口。这里以EventTime为例,基于时间的窗口,可以分为3种:TumblingEventTimeWindows,SlidingEventTimeWindows和EventTimeSessionWindows。
对于Tumbling与Sliding窗口,其窗口的时间大小是固定的,例如10秒钟一个窗口,那么窗口中开始时间和结束时间一定是一个10秒的间隔,例如从10:00:00到10:00:10。Sliding的窗口大小也是固定的,例如每隔10秒钟统计过去20秒的数据,那么它的窗口也是从10:00:00到10:00:20,大小是20秒。
而Session window的窗口大小,则是由数据本身决定。例如,基于同一个key,有如下几条数据,其自身时间戳如下:
key,10:00:00 key,10:00:03 key,10:00:05 key,10:00:12 key,10:00:15 key,10:00:24 key,10:00:30 key,10:00:42 .....
那么,假设Session Window的时间gap如果是6秒,那么,上面的数据会被分成以下几个窗口(窗口开始时间以及窗口结束时间,窗口内记录数):
窗口1:(key,10:00:00,10:00:11,3) 窗口2:(key,10:00:12,10:00:21,2) 窗口3:(key,10:00:24,10:00:30,1) 窗口4:(key,10:00:30,10:00:42,1) ......
可以看到,session window只需要设置一个时间间隔(gap)即可定义一个session window机制。
2、session window窗口分析
下面我们来分析下上边的数据。首先,我们设置的时间gap是6秒,那么,当相邻的记录相差>=6秒时,则触发窗口。
对于第一条记录与第二条记录,其时间间隔是3秒,那么这两条记录属于同一个窗口内,此时并不触发窗口;第二条与第三条记录,间隔2秒,也不触发窗口;第三条与第四条记录,间隔>=6秒(7秒),此时,窗口被触发了。
继续,第四条记录与第五条记录间隔3秒,不触发;第五条与第六条间隔9秒,触发;
继续,第六条与第七条间隔6秒,触发;
继续,第七条与第八条间隔12秒,触发。
到此,上边这些数据被划分到不同的窗口中,每个窗口的大小也不一样。
那么,每个窗口的时间范围有没有什么共性?我们可以按照下面的公式来计算每个窗口的时间范围:
窗口大小=[第一条数据的时间,第一个与相邻数据相差大于等于gap的时间+gap)
看似有点难以理解,其实现实的意义就是:窗口内包含的数据是“活跃的”。
例如,用户点击行为,如果认为30秒间隔用户没有操作,则认为是不活跃的。那么通过session window,定义一个30秒的gap,此时,每个窗口内的数据,都是用户在活跃期间的数据,超过30秒了没有任何操作,则认为用户不活跃,有可能下线。
3、session window在Flink中的实现
上面的介绍有点繁琐,不够言简意赅,那么我们直接看代码。数据介绍:在代码之前,介绍下数据,指数数据,正常情况每隔3秒产生一条,如果达到6秒甚至更多实践才产生数据,则认为有gap,此时说明指数的交易不够频繁,不够活跃。
代码如下:
import java.text.SimpleDateFormat import org.apache.flink.streaming.api.scala._ import DataTypes.StockIndex import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.RichWindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import toptrade.DataStreamOperator.CommonOperator import toptrade.kafkaInOut.KafkaConsumeToptrade object SessionWindowTest { // ************************************************************************* // main函数 // ************************************************************************* def main(args : Array[String]) : Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val source = new KafkaConsumeToptrade() val indexString = source.indexDataStream(env).name("Index").setParallelism(4) val indexDataStream = new CommonOperator().mapIndexToDataStreamPOJO(indexString).filter(f=>f.lastIndex != 0L && f.totalVolume != 0L).setParallelism(8).name("index filter") val watermarkIndex = indexDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockIndex] { var currentMaxTimestamp = 0L val maxOutOfOrderness = 10000L override def getCurrentWatermark: Watermark = { new Watermark(currentMaxTimestamp - maxOutOfOrderness) } override def extractTimestamp(t: StockIndex, l: Long): Long = { val timestamp = t.time currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) timestamp } }) .name("index watermark") .setParallelism(8) val sessionWindow = watermarkIndex .keyBy(_.code) .window(EventTimeSessionWindows.withGap(Time.seconds(6))) .apply(new IndexSessionWindow) .setParallelism(8) sessionWindow.print().setParallelism(1) env.execute() } // ************************************************************************* // SessionWindow Function // ************************************************************************* class IndexSessionWindow extends RichWindowFunction[StockIndex,(String,String,String,String,String,Int),String,TimeWindow]{ var state : ValueState[IndexSumTest] = null var size = 0 val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") override def open(config : Configuration) : Unit = { state = getRuntimeContext.getState(new ValueStateDescriptor[IndexSumTest]("snapshot State", classOf[IndexSumTest], null)) } override def apply(key: String, window: TimeWindow, input: Iterable[StockIndex], out: Collector[(String, String, String, String,String,Int)]): Unit = { //init if(state.value() == null){ state.update(IndexSumTest(0)) }else{ size = state.value().size } val list = input.toList.sortBy(_.time) val window_start_time = format.format(window.getStart) val window_end_time = format.format(window.getEnd) val window_size = input.size size = size + window_size state.update(IndexSumTest(size)) out.collect((key,window_start_time,window_end_time,format.format(list.head.time),format.format(list.last.time),size)) } } // ************************************************************************* // Case Class // ************************************************************************* case class IndexSumTest(size : Int) }
session window function的实现,输出的内容代表:(key,窗口开始时间,窗口结束时间,窗口内最早的一条数据的时间,窗口内最后一条数据时间,同一个key的累计个数)。
4、session window的输出结果
上面的结果,输出如下(抽取了其中一小部分):(990857,2016-09-23 14:31:03.000,2016-09-23 14:31:09.000,2016-09-23 14:31:03.000,2016-09-23 14:31:03.000,5) (990857,2016-09-23 14:31:49.000,2016-09-23 14:31:55.000,2016-09-23 14:31:49.000,2016-09-23 14:31:49.000,6) (990857,2016-09-23 14:32:09.000,2016-09-23 14:32:20.000,2016-09-23 14:32:09.000,2016-09-23 14:32:14.000,8) (990857,2016-09-23 14:32:29.000,2016-09-23 14:32:35.000,2016-09-23 14:32:29.000,2016-09-23 14:32:29.000,9) (990857,2016-09-23 14:32:39.000,2016-09-23 14:32:45.000,2016-09-23 14:32:39.000,2016-09-23 14:32:39.000,10) (990857,2016-09-23 14:32:49.000,2016-09-23 14:32:55.000,2016-09-23 14:32:49.000,2016-09-23 14:32:49.000,11) (990857,2016-09-23 14:33:04.000,2016-09-23 14:33:10.000,2016-09-23 14:33:04.000,2016-09-23 14:33:04.000,12) (990857,2016-09-23 14:33:14.000,2016-09-23 14:33:20.000,2016-09-23 14:33:14.000,2016-09-23 14:33:14.000,13) (990857,2016-09-23 14:33:29.000,2016-09-23 14:33:35.000,2016-09-23 14:33:29.000,2016-09-23 14:33:29.000,14) (990857,2016-09-23 14:33:39.000,2016-09-23 14:33:45.000,2016-09-23 14:33:39.000,2016-09-23 14:33:39.000,15) (990857,2016-09-23 14:33:49.000,2016-09-23 14:33:55.000,2016-09-23 14:33:49.000,2016-09-23 14:33:49.000,16) (990857,2016-09-23 14:34:04.000,2016-09-23 14:34:10.000,2016-09-23 14:34:04.000,2016-09-23 14:34:04.000,17) (990857,2016-09-23 14:34:14.000,2016-09-23 14:34:20.000,2016-09-23 14:34:14.000,2016-09-23 14:34:14.000,18)
我们以第三个窗口为例来说明:
第三个窗口中有2条记录(8-6),最早的一条记录时间是:2016-09-23 14:32:09.000,最后的一条记录时间是:2016-09-23 14:32:14.000。相差5秒,因此这两条数据没有达到6秒的间隔,所以这两条数据一定属于同一个窗口。下一条数据可以观察下一个窗口的开始时间:2016-09-23 14:32:29.000,比第三个窗口的最后一条的时间多了15秒,因此才产生了第三个窗口。第三个窗口的结束时间是:2016-09-23 14:32:20.000,正好是窗口内最后一个数据时间+gap的时间。
由此也验证了我们上边提高的公式。
不过作为窗口结束时间,在实际中的用处不大,只是gap内部记录的一个时间戳而已,仅做触发条件使用。
5、参考
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#session-windowshttp://wuchong.me/blog/2016/06/06/flink-internals-session-window/
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
相关文章推荐
- Flink流计算编程--Kafka+Flink整合demo
- Flink流计算编程:双流中实现Inner Join、Left Join与Right Join
- Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime
- Flink流计算编程--如何实现基于KEY/VALUE的List State
- Flink流计算编程--看看别人怎么用Session Window
- Flink流计算编程--Flink中allowedLateness详细介绍及思考
- Flink流计算编程--状态与检查点
- Flink流计算编程--watermark(水位线)简介
- Flink流计算编程--watermark(水位线)简介
- Flink流计算编程--状态与检查点
- Flink流计算编程--在双流中体会joinedStream与coGroupedStream
- Flink流计算编程--Flink sink to Oracle
- Flink流计算编程--流处理引擎的选型
- Flink流计算编程--Flink扩容、程序升级前后的思考
- UNIX环境高级编程学习之第十五章进程间通信 - 信号量的使用(信号灯的使用, 计算信号灯)
- 金山招聘题目:编程计算从1到2008080808之间的整数有多少个含有数字7
- 浅谈.NET下的多线程和并行计算(八)Winform中多线程编程基础上
- 趣味编程之计算相亲数(下)
- 可编程计算组件
- 多核处理器并行计算编程利器OpenPM