您的位置:首页 > 编程语言 > Java开发

对RxJava中Backpressure概念的理解

2017-04-17 00:00 302 查看
英文原文地址请点击这里

一. Backpressure(背压、反压力)

在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。

举个例子,使用 zip 操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍。一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Observable发送消息的时候取出来并将他们结合在一起。这样做就使得rxjava变得笨重而且十分占用系统资源。

在rxjava中有多重控制流以及背压(backpressure)策略用来应对当一个快速发送消息的被观察者遇到一个处理消息缓慢的观察者。下面的解释将会向你展示你应当怎么设计属于你自己的被观察者和操作符去应对流量控制(flow control)。

二. Hot and cold Observables, and multicasted Observables

Observable 数据流有两种类型:hot 和 cold。这两种类型有很大的不同。本节介绍他们的区别,以及作为开发者应该如何正确的使用他们。

1. Cold observables

只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。

我们经常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。

2. Hot observables

Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。

了解更多Hot and cold Observables,参考这里

当一个cold observable是multicast(多路广播)(当转换完成时或者方法被调用)的时候,为了应对背压,应当把cold observable转换成hot observable。

cold observable 相当于响应式拉(就是observer处理完了一个事件就从observable拉取下一个事件),hot observable通常不能很好的处理响应式拉模型,但它却是处理流量控制问题的不二候选人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比operations, throttling, buffers, or windows。

三. 能从某种意义上实现背压的运算符

防止过度创建observable的第一道防线就是使用普通数组去减少observable发送消息的数量,在这一节会使用一些操作符去应对突发的observable发送爆发性数据(一会没有,一会很多)就像下面的这张图片所示:



这些操作符可以通过微调参数确保slow-consuming观察者不被生产可观测的。

1. Throttling节流

操作符中比如 sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( ) 允许你通过调节速率来改变Observable发射消息的速度。

以下图表展示如何使用这些操作符。

(1). sample (或 throttleLast)

sample 操作符定期收集observable发送的数据items,并发射出最后一个数据item。

Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);


上面代码解释,定期且一次收集5个item,发射出最后一个item。

(2). throttleFirst

跟sample有点类似,但是并不是把观测到的最后一个item发送出去,而是把该时间段第一个item发送出去。



Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);


(3). debounce (or throttleWithTimeout)

debounce操作符会只发送两个在规定间隔内的时间发送的序列的最后一个。



Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);


(4). Buffers and windows 缓冲区和窗口

可以使用操作符比如buffer( ) 或者window( ) 收集过度生成消息的Observable的数据项,然后发射出较少使用的数据。缓慢的消费者可以决定是否处理每个集合中的某一个特定的项目,或处理集合中的某种组合,或为集合中的每一项预定计划工作,这都要视情况处理。

以下图表展示如何使用这些操作符。

buffer

你可以定期关闭并释放突发性的 Observable 缓冲区。



Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);


在突发期间你可以得到的想要的,并在缓冲区收集数据和最终在突发结束的时候释放缓存。使用debounce操作符释放缓存并关闭指示器buffer操作符。

此段超过本人翻译水平,特提供原文如下,如有好的翻译建议请提出。
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);


window

与buffer类似,在一个window转换中允许你发送一个周期性的生成消息的Observable的数据项窗口:



Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);

当你每次从源被观察者收集了特定数量的数据项后也可以选择重新发送一个新的window:



Observable<Observable<Integer>> burstyWindowed = bursty.window(5);


四. 使用线程阻塞

处理过快生产item的其他策略就是使用线程阻塞,但是这么做违背了响应式设计和非阻塞模型设计,但是它的确是一个可行的选择。在rxJava中并没有操作符可以做到这一点。

如果observable发送消息,subscriber消耗消息都是在同一个线程这将很好的处理这个问题,但是你要知道,在rxJava中,很多时候生产者和消费者都不在同一个线程。

五. 如何建立“响应式拉动(reactive pull)”backpressure

当subscribe订阅observable的时候可以通过调用subscribe.request(n),n是你想要的observable发送出来的量。

当在onNext()方法里处理完某个数据项(或一些数据项)后,你能重新调用 request()方法,通知Observable发射数据项。下面是个例子:

someObservable.subscribe(new Subscriber<t>() {
@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {
// gracefully handle sequence-complete
}

@Override
public void onError(Throwable e) {
// gracefully handle error
}

@Override
public void onNext(t n) {
// do something with the emitted item "n"
// request another item:
request(1);
}
});


你可以通过一个神奇数字request, request(Long.MAX_VALUE),禁用反应拉背力和要求Observable按照自己的步伐发射数据。request(0)是一个合法的调用,但不会奏效。请求值小于零的请求会导致抛出一个异常。

1. Reactive pull backpressure isn’t magic

backpressure 不会使得过度生产的observable的问题消失,这只是提供了一种更好的解决问题的方法。 让我们更仔细的研究刚刚说到的zip操作符的问题。

这里有两个observable,a和b,b发射item比a更加的频繁,当你想zip这两个observable的时候,你需要把a发送出来的第n个和b发送出来的第n个对象处理,然而由于b发送出来的速率更快,这时候b已经发送出了n+1~n+m个消息了,这时候你要想要把a的n+1~n+m个消息结合的话,就必须持有b已经发送出来的n+1~n+m消息,同时,这意味着缓存的数量在不断的增长。

当然你可以给b添加操作符throttling,但是这意味着你将丢失某些从b发送出来的项,你真正想要做的其实就是告诉b:“b你需要慢下来,但是你要保持你给我的数据是完整的”。

响应式拉(reective pull)模型可以帮你做到这一点,subscriber从observable那里拉取数据,这与通常情况下从observable那里推送数据这种模式相比形成鲜明的对比。

在rxJava中,zip操作符正是使用了这种技巧。它给每个源observable维护了一个小的缓存池,当它的缓存池满了以后,它将不会从源observable那里拉取item。每当zip发送一个item的时候,他从它的缓存池里面移除相应的项,并从源observable那里拉取下一个项。

在rxJava中,很多操作符都使用了这种模式(响应式拉),但是有的操作符并没有使用这种模式,因为他们也许执行的操作跟源observable处于相同的进程。在这种情况下,由于消耗事件会阻塞本进程,所以这一项的工作完成后,才有机会收到下一项。还有另外一种情况,backpressure也是不适合的,因为他们有指定的其他方式去处理流量控制,这些特殊的情况在rxJava的javadocs里面都会有详细说明。

但是,observable a和b必须正确的响应request()方法,如果一个observable还没有被支持响应式拉(并不是每个observable都会支持),你可以采取以下其中一种操作都可以达到backpressure的行为:

(1). onBackpressurebuffer

为所有从源observable发送出来的数据项维护一个缓存区,根据他们生成的request发送给下层流。



这个操作符还有一个实验性的版本允许去设置这个缓存池的大小,但当缓存池满了以后将会终止执行并抛出异常。

(2). onBackpressureDrop

终止发送来自源observable的事件,除非来自下层流的subscriber即将调用request(n)方法的时候,此时才会发送足够的数据项给以满足requst。



(3). onBackpressureBlock (实验性的, 不支持RxJava 1.0)

阻塞源Observable正在操作的线程的线程直到某个Subscriber发出请求,然后只要有即将发出的请求就结束阻塞。



如果你不允许这些操作符操作用在一个不支持背压的Observable上,并且 如果作为Subscriber的你或者在你和Observable之间的一些操作符尝试去应用响应式拉背压,你将会在onError回调事件中遭遇 MissBackpresssureException的警告。

六. Flowable与Observable

最后,为了大家更好的理解backpressure概念,这里补充说一下Flowable。

Observable在RxJava2.0中新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。

举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

而在2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则经常会造成点击两次的效果。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息