Spark Streaming中如何实现Exactly-Once
2017-12-17 19:34
357 查看
Exactly-once 语义是实时计算的难点之一。要做到每一条记录只会被处理一次,即使服务器或网络发生故障时也能保证没有遗漏,这不仅需要实时计算框架本身的支持,还对上游的消息系统、下游的数据存储有所要求。此外,我们在编写计算流程时也需要遵循一定规范,才能真正实现 Exactly-once。本文将讲述如何结合 Spark Streaming 框架、Kafka 消息系统、以及 MySQL 数据库来实现 Exactly-once 的实时计算流程。
Spark
Streaming
首先让我们实现一个简单而完整的实时计算流程。我们从 Kafka 接收用户访问日志,解析并提取其中的时间和日志级别,并统计每分钟错误日志的数量,结果保存到 MySQL 中。
示例日志:
结果表结构,其中
Scala 项目通常使用
Spark 2.2 和 Kafka 0.10,数据库操作类库使用了 ScalikeJDBC 3.0。
完整的示例代码已上传至 GitHub(链接),下面我仅选取重要的部分加以说明:
实时计算有三种语义,分别是 At-most-once、At-least-once、以及 Exactly-once。一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义。
对于 接收数据,主要取决于上游数据源的特性。例如,从 HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。如果上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead
Log 特性来实现 At-least-once 语义。对于非可靠的数据接收器(如
Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。而 Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义。
在使用 Spark RDD 对数据进行 转换或汇总 时,我们可以天然获得 Exactly-once 语义,因为 RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果。
结果输出 默认符合 At-least-once 语义,因为
Worker 节点失效而执行多次,从而重复写入外部存储。我们有两种方式解决这一问题,幂等更新和事务更新。下面我们将深入探讨这两种方式。
如果多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的。
Exactly-once 语义了。但需要注意的是,幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作。此外,我们还需对 Kafka DStream 做一些额外设置:
将
DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用下面的操作:
在数据输出之后手动提交 Kafka 偏移量。
可以做到这一点:
在使用事务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成。之后,我们需要在一个事务中将处理结果和这个唯一 ID 一同写入数据库。这一原子性的操作将带给我们 Exactly-once 语义,而且该方法可以同时适用于 Map-only 以及包含汇聚操作的计算流程。
我们通常会在
Map-only 流程来说是适用的,因为这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
但对于包含 Shuffle 的计算流程(如上文的错误日志统计),我们需要先将处理结果拉取到 Driver 进程中,然后才能执行事务操作:
如果偏移量写入失败,或者重复处理了某一部分数据(
实时计算中的 Exactly-once 是比较强的一种语义,因而会给你的应用程序引入额外的开销。此外,它尚不能很好地支持窗口型操作。因此,是否要在代码中使用这一语义就需要开发者自行判断了。很多情况下,数据丢失或重复处理并不那么重要。不过,了解
Exactly-once 的开发流程还是有必要的,对学习 Spark Streaming 也会有所助益。
Spark
Streaming
引例
首先让我们实现一个简单而完整的实时计算流程。我们从 Kafka 接收用户访问日志,解析并提取其中的时间和日志级别,并统计每分钟错误日志的数量,结果保存到 MySQL 中。示例日志:
123 | 2017-07-30 14:09:08 ERROR some message2017-07-30 14:09:20 INFO some message2017-07-30 14:10:50 ERROR some message |
log_time字段会截取到分钟级别:
1234 | create table error_log ( log_time datetime primary key, log_count int not null default 0); |
sbt来管理。我们将下列依赖添加到
build.sbt文件中。本例使用的是
Spark 2.2 和 Kafka 0.10,数据库操作类库使用了 ScalikeJDBC 3.0。
12345678 | scalaVersion := "2.11.11"libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "2.2.0" % "provided", "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0", "org.scalikejdbc" %% "scalikejdbc" % "3.0.1", "mysql" % "mysql-connector-java" % "5.1.43") |
1234567891011121314151617181920212223242526272829303132 | // 初始化数据库连接ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")// 创建 Spark Streaming 上下文val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")val ssc = new StreamingContext(conf, Seconds(5))// 使用 Kafka Direct API 创建 DStreamval messages = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))messages.foreachRDD { rdd => // 日志处理 val result = rdd.map(_.value) .flatMap(parseLog) // 日志解析函数 .filter(_.level == "ERROR") .map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1) .reduceByKey(_ + _) .collect() // 结果保存至数据库 DB.autoCommit { implicit session => result.foreach { case (time, count) => sql""" insert into error_log (log_time, log_count) value (${time}, ${count}) on duplicate key update log_count = log_count + values(log_count) """.update.apply() } }} |
实时计算语义
实时计算有三种语义,分别是 At-most-once、At-least-once、以及 Exactly-once。一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义。对于 接收数据,主要取决于上游数据源的特性。例如,从 HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。如果上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead
Log 特性来实现 At-least-once 语义。对于非可靠的数据接收器(如
socketTextStream),当
Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。而 Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义。
在使用 Spark RDD 对数据进行 转换或汇总 时,我们可以天然获得 Exactly-once 语义,因为 RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果。
结果输出 默认符合 At-least-once 语义,因为
foreachRDD方法可能会因为
Worker 节点失效而执行多次,从而重复写入外部存储。我们有两种方式解决这一问题,幂等更新和事务更新。下面我们将深入探讨这两种方式。
使用幂等写入实现
Exactly-once
如果多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的。saveAsTextFile就是一种典型的幂等写入。如果消息中包含唯一主键,那么多次写入相同的数据也不会在数据库中产生重复记录。这种方式也就能等价于
Exactly-once 语义了。但需要注意的是,幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作。此外,我们还需对 Kafka DStream 做一些额外设置:
将
enable.auto.commit设置为
false。默认情况下,Kafka
DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用下面的操作:
在数据输出之后手动提交 Kafka 偏移量。
HasOffsetRanges类,以及
commitAsyncAPI
可以做到这一点:
1234567 | messages.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => // output to database } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} |
使用事务写入实现
Exactly-once
在使用事务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成。之后,我们需要在一个事务中将处理结果和这个唯一 ID 一同写入数据库。这一原子性的操作将带给我们 Exactly-once 语义,而且该方法可以同时适用于 Map-only 以及包含汇聚操作的计算流程。我们通常会在
foreachPartition方法中来执行数据库写入操作。对于
Map-only 流程来说是适用的,因为这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
123456 | messages.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val offsetRange = offsetRanges(TaskContext.get.partitionId) }} |
1234567891011121314151617181920 | messages.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val result = processLogs(rdd).collect() // parse log and count error DB.localTx { implicit session => result.foreach { case (time, count) => // save to error_log table } offsetRanges.foreach { offsetRange => val affectedRows = sql""" update kafka_offset set offset = ${offsetRange.untilOffset} where topic = ${topic} and `partition` = ${offsetRange.partition} and offset = ${offsetRange.fromOffset} """.update.apply() if (affectedRows != 1) { throw new Exception("fail to update offset") } } }} |
offset != $fromOffset判断条件不通过),该事务就会回滚,从而做到 Exactly-once。
总结
实时计算中的 Exactly-once 是比较强的一种语义,因而会给你的应用程序引入额外的开销。此外,它尚不能很好地支持窗口型操作。因此,是否要在代码中使用这一语义就需要开发者自行判断了。很多情况下,数据丢失或重复处理并不那么重要。不过,了解Exactly-once 的开发流程还是有必要的,对学习 Spark Streaming 也会有所助益。
相关文章推荐
- Spark Streaming 中如何实现 Exactly-Once 语义
- Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
- SparkStreaming实现Exactly-Once语义
- kafka exactly once 的实现原理解析
- require、include、require_once、include_once区别? 加载区别? 如果程序按需加载某个php文件你如何实现?
- SparkStreaming实现Exactly-Once语义
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(英文)
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(中文)
- Trident exactly once实现原理
- Spark Streaming Crash 如何保证Exactly Once Semantics
- Kafka 0.11.0.0 实现 producer的Exactly-once 语义(官方DEMO)
- 分析 OVS 如何实现 vlan 隔离
- 如何用C++实现一个LRU Cache
- MySql计数器,如网站点击数,如何实现高性能高并发的计数器功能
- C#中如何实现数据拖动?(拖动图片,到TextBox,并显示)
- C++如何实现类对象只能动态分配或只能静态分配
- 如何实现session共享
- android 如何实现EditText从不可编辑状态变成可变成可编辑状态
- 【C++ STL应用与实现】26: 如何使用std::for_each以及基于范围的for循环 (since C++11)
- Objective-C如何自己实现一个基于数组下标的属性访问模式