PlayScala 开发技巧 - 实时同步 MongoDB 高可用方案
2018-09-03 08:21
1011 查看
1 如何实时同步MongoDB?
MongoDB 从 3.6 开始为开发者提供了 Change Streams 功能,利用 Change Streams 功能可以非常方便地监听指定 Collection 上的数据变化。例如在 mongo shell 中,我们可以通过如下方式监听 shopping 数据库 order 表上的变化:watchCursor = db.getSiblingDB("shopping").order.watch() while (!watchCursor.isExhausted()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
2 在Play中如何操作?
利用 Play Mongo 可以方便地实现数据监听功能,并且我们可以将 Change Stream 转换成 Akka Stream,然后以流的方式处理指定 Collection 上的数据变化,mongo .collection[Order] .watch() .fullDocument .toSource .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => // ... }
上面的代码实现了以下几个功能:
将从 Change Stream 接收到的元素进行缓冲,以方便批处理,当满足下面任意一个条件时便结束缓冲向后传递:
缓冲满10个元素
缓冲时间超过了1000毫秒
对缓冲后的元素进行流控,每秒只允许通过1个元素
3 如何实现高可用?
上面的代码并没有考虑可用性,如果在监听过程中发生了网络错误,如何从错误中恢复呢? 上面的实现代码底层是基于官方的 mongo-java-driver 实现的,关于可用性官方文档有如下描述:Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.
文档中提及程序可以自动从可恢复的错误中恢复。经测试验证,如果网络中断在 30 秒以内均属于可恢复错误;但是如果大于 30 秒,则会报连接超时错误并且无法从错误中自动恢复:
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=127.0.0.1:27117, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}] at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:401) at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309) at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65) at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482) at java.lang.Thread.run(Thread.java:748)
幸运的是,Akka Stream 的 RestartSource 可以帮我们解决这种不可恢复错误,解决方式就是通过指数规避(exponential back-off)方式不断重试。下面是一个通用的创建 RestartSource 的方法实现:
def restartSource(colName: String): Source[ChangeStreamDocument[JsObject], _] = { RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 10.seconds, randomFactor = 0.2, maxRestarts = 1000000 ) { () ⇒ Logger.warn(s"Creating source for watching ${colName}.") mongo.collection(colName).watch().fullDocument.toSource } }
通过 Backoff 参数可以指定重试策略:
minBackoff 最小重试时间间隔
maxBackoff 最大重试时间间隔
randomFactor 设置一个随机的浮动因子,使得每次计算的间隔有些许差异
maxRestarts 最大重试次数
当发生错误时,RestartSource 会尝试重新创建一个 Source:
Logger.warn(s"Creating source for watching ${colName}.") mongo.collection(colName).watch().fullDocument.toSource
完整代码如下:
val colName = "common-user" restartSource(colName) .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => try { Logger.info(seq.toString()) } catch { case t: Throwable => Logger.error(s"Watch change stream of ${colName} error: ${t.getMessage}", t) } }
需要注意的是 runForeach 中需要显式捕获异常并处理,否则会导致 Source 结束并退出。
相关文章推荐
- Play Scala 开发技巧 - 请求限速
- Play For Scala 开发指南 - 第10章 MongoDB 开发
- 微服务MySQL分库分表数据到MongoDB同步方案[转]
- 移动设备非实时快速同步方案
- 利用GoldenGate实现Oracle实时同步方案
- 企业实时同步方案----Rsync+Sersync
- APP开发实战10-APP数据同步方案
- 微服务MySQL分库分表数据到MongoDB同步方案[转]
- Android开发时手机屏幕实时同步显示在电脑上
- mongodb高可用方案(二)可复制集
- rsync+inotify实时同步方案
- Solr(搜索引擎服务)和MongoDB通过mongodb-connector进行数据同步的解决方案,以及遇到的各种坑的总结(针对solr-5.3.x版本),mongodb和solr实现实时增量索引
- 企业实时同步方案----Rsync+Inotify-Tools 推荐
- 实时监控、直播流、流媒体、视频网站开发方案流媒体服务器搭建及配置详解:使用nginx搭建rtmp直播、rtmp点播、,hls直播服务配置详解
- linux下两台服务器文件实时同步方案设计和实现
- 网络游戏实时动作同步方案手记(1)
- play+mongodb+scala的项目搭建
- mongodb高可用方案(三)可复制集内部机制
- [MongoDB] 高可用架构方案
- mysql高可用探究(二)Lvs+Keepalived+Mysql单点写入主主同步高可用方案