RxJava与网络相关的操作符(range/defer/retry/repeat/timer/delay/interval/BehaviorSubject/zip)
2017-02-22 18:11
597 查看
工欲善其事必先利其器。不多废话直接看重点。
输出
注意:range操作符的不用主动调用onNext()和onCompleted(),它里面已经实现了对Subcriber的onNext()和onCompleted()的调用。
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/06aeb3fd47815f8a0ed0882e35415194)
操作符:这个长框内有很多数据流,要表达的含义是:每次都创建全新的数据流 Observable 。
输入:图中产生了两条全新的数据流,且发送的数据可能不一样(弹珠颜色不一样)
输出:创建型的操作符基本上都没有输出的图示,根据对操作符的大概理解,为了验证输入,需要订阅两次。
实现思路:defer 在每次产生 Observable 时,都保存起来,最终验证这些数据流不会相等。
输出:
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/fd25d96da3fd04576f7bb272b1b6e5b1)
有了之前的铺垫,实现这张弹珠图并不复杂:数据流第一次发送了一个 Error 数据,retry 执行,订阅者重新发起订阅,数据流第二次发送正常的数据。具体代码实现如下:
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/c28ea179430252410ace8115782fd825)
这张图很难理解,既有错误重试,还有延时策略,实在无从下手,我们需要查阅更多的文章,幸运是刚刚 defer 篇的那位作者写了相关的另外一篇文章 RxJava’s repeatWhen and retryWhen, explained,也有相应的译文 。仔细阅读之后,梳理下 retryWhen 的套路,当错误重试需要延时策略时,实现流程大概是这样子的:
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/fea5333eca2c2bfbe717989196171825)
将 Observable error 与 Observable.range(1, 3) 做 zip 聚合,range 作为创建型的操作符,将产生 1,2,3 的数据流,因此前3次 error 将会正常配对并调用 onCompleted(),不再接收第四次的 error。
具体的代码实现如下:
Repeat与Retry的对比
首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。
当.repeat()接收到.onCompleted()事件后触发重订阅。 当.retry()接
到.onError()事件后触发重订阅。
然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen()和.retryWhen()的介入了,因为它们允许你为重试提供自定义逻辑。
Notification Handler
你可以通过一个叫做notificationHandler的函数来实现重试逻辑。这是.retryWhen()的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):
签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。
简化后,它包括三个部分:
Func1像个工厂类,用来实现你自己的重试逻辑。
输入的是一个Observable。
输出的是一个Observable
由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)触发重订阅或者.onError(error)来避免重订阅。
经验之谈
这里有一些关于.repeatWhen()和.retryWhen()的要点,我们应该牢记于心。
.repeatWhen()与.retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable。
每一次事件流的订阅notificationHandler(也就是Func1)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable,它能够发送任意数量的error。
输入的Observable必须作为输出Observable的源。你必须对Observable做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。
换言之就是,你不能做类似的操作:
因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:
(顺便提一下,这在逻辑上与单纯使用.retry()操作符的效果是一样哒)
输入Observable只在终止事件发生的时候才会触发(对于.repeatWhen()来说是onCompleted,而对于.retryWhen()来说是onError)。它不会从源中接收到任何onNext的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()这样的操作符,来拦截事件流。
timer():创建一个Observable,它在一个给定的延迟后发射一个特殊的值
这里需要注意,定义里面说的是『一个』,所以有别于之前用的TimerTask。timer()只是用来创建一个Observable,并延迟发送一次的操作符,timer()并不会按周期执行。
这个比较好理解,interval()也是用来创建Observable的,并且也可以延迟发送。但interval()是按周期执行的,所以可以这么认为:interval()是一个可以指定线程的TimerTask(威力加强版……)
语文没学好肯定读不懂这一段,我才看到这句话的时候也懵了……
其实delay()的常规使用跟timer()一致,那区别在哪呢?delay()是用于流中的操作,跟map()、flatMap()的级别是一样的。而timer()是用于创建Observable,跟just()、from()的级别是一样的。
interval():用于创建Observable,跟TimerTask类似,用于周期性发送。
delay():用于事件流中,可以延迟发送事件流中的某一次发送。
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/990692e359d30cff782c4adc4627569f.png)
如果遇到错误会直接中断
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/3299bd5088b0c38d32555955e842a8de.png)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/8f3f6520322ba3279625cdcc621b2aab.png)
Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。
Javadoc: zip(Iterable,FuncN))
Javadoc: zip(Observable,FuncN))
Javadoc: zip(Observable,Observable,Func2)) (最多可以有九个Observables参数)
运行结果如下:
Next:14
Next:28
Next:42
Sequence complete.
操作符range
range操作符的作用Range操作符根据出入的初始值n和数目m发射一系列大于等于n的m个值。public class MainActivity extends AppCompatActivity { private Button btn; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); btn = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { rangeTest(); } }); } /** * 测试range操作符 */ private void rangeTest() { Observable.range(23, 3).subscribe(new Subscriber<Integer>() { public void onNext(Integer value) { Log.e("rangeObserver", String.valueOf(value)); } @Override public void onCompleted() { Log.e("rangeObserver", "onCompleted"); } @Override public void onError(Throwable e) { Log.e("rangeObserver", "onError"); } }); } }
输出
05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 23 05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 24 05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 25 05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: onCompleted
注意:range操作符的不用主动调用onNext()和onCompleted(),它里面已经实现了对Subcriber的onNext()和onCompleted()的调用。
操作符 defer
defer 是创建型的操作符,字面上有「推迟」的意思,推迟创建数据流的规则是:一开始不会马上创建 Observable,直到有订阅者订阅时才会创建,且每次都创建全新的 Observable。操作符:这个长框内有很多数据流,要表达的含义是:每次都创建全新的数据流 Observable 。
输入:图中产生了两条全新的数据流,且发送的数据可能不一样(弹珠颜色不一样)
输出:创建型的操作符基本上都没有输出的图示,根据对操作符的大概理解,为了验证输入,需要订阅两次。
实现思路:defer 在每次产生 Observable 时,都保存起来,最终验证这些数据流不会相等。
操作符repeat
repeat操作符就是对某一个Observable重复产生多次结果,当repeat() 接收到onComplete()会触发重订阅,默认情况下运行在一个新的线程上.Observable.range(1, 5).repeat(5).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { LogUtils.d("-------->" + integer); } });
输出:
这里重复执行了5次,打印结果: 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4 02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5
操作符retry
retry 和 retryWhen 是错误处理型的操作符,当数据流发送了错误的数据时,将根据既定的规则发起重新订阅。有了之前的铺垫,实现这张弹珠图并不复杂:数据流第一次发送了一个 Error 数据,retry 执行,订阅者重新发起订阅,数据流第二次发送正常的数据。具体代码实现如下:
@Test public void retry() { final Integer[] arrays = {0}; Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3 / arrays[0]++); subscriber.onCompleted(); } }) .retry() .subscribe(mList::add); assertEquals(mList, Arrays.asList(1, 2, 1, 2, 3)); }
操作符retryWhen
retry 只是小试牛刀,接下来看看 retryWhen。这张图很难理解,既有错误重试,还有延时策略,实在无从下手,我们需要查阅更多的文章,幸运是刚刚 defer 篇的那位作者写了相关的另外一篇文章 RxJava’s repeatWhen and retryWhen, explained,也有相应的译文 。仔细阅读之后,梳理下 retryWhen 的套路,当错误重试需要延时策略时,实现流程大概是这样子的:
@Test public void retryWhen_flatMap_timer() { Observable.create(subscriber -> { System.out.println("subscribing"); subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new RuntimeException("RuntimeException")); }) .retryWhen(observable -> observable.flatMap( (Func1<Throwable, Observable<?>>) throwable -> //延迟5s重新订阅 Observable.timer(5, TimeUnit.SECONDS, mTestScheduler) ) ) .subscribe(num -> { System.out.println(num); mList.add(num); }); //时间提前10s,将发生1次订阅+2次重新订阅 mTestScheduler.advanceTimeBy(10, TimeUnit.SECONDS); assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2)); }
破坏数据流
如果 retryWhen 的输入 Observable ,被粗暴的直接返回一个普通的数据流,则链式结构将被打断,如下代码:@Test public void retryWhen_break_sequence() { // 错误的做法:破坏数据流,打断链式结构 Observable.just(1, 2, 3) .retryWhen(throwableObservable -> Observable.just(1, 1, 1)) .subscribe(mList::add); //数据流被打断,订阅不到数据 assertTrue(mList.isEmpty()); // 正确的做法:至少将throwableObservable作为返回结果,此时的retryWhen()等价于retry() Observable.just(1, 2, 3) .retryWhen(throwableObservable -> throwableObservable). subscribe(mList::add); //此处的数据流不会触发error,因此正常输出1,2,3的数列 assertEquals(mList, Arrays.asList(1, 2, 3)); }
限制次数的延时错误重试
当数据流产生错误的数据时,会触发 retryWhen,并输入 Observable error 。将 Observable error 与 Observable.range(1, 3) 做 zip 聚合,range 作为创建型的操作符,将产生 1,2,3 的数据流,因此前3次 error 将会正常配对并调用 onCompleted(),不再接收第四次的 error。
具体的代码实现如下:
@Test public void retryWhen_zip_range_timer() { Observable.create((Subscriber<? super Integer> subscriber) -> { System.out.println("subscribing"); subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new RuntimeException("always fails")); }) .retryWhen(observable -> observable.zipWith( Observable.range(1, 3), (Func2<Throwable, Integer, Integer>) (throwable, num) -> num ) .flatMap((Func1<Integer, Observable<?>>) num -> { System.out.println("delay retry by " + num + " second(s)"); return Observable.timer(num, TimeUnit.SECONDS); })) .doOnNext(System.out::println) .doOnCompleted(() -> System.out.println("completed")) .toBlocking() .forEach(mList::add); //正常订阅一次,重新订阅3次 assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2)); }
Repeat与Retry的对比
首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。
当.repeat()接收到.onCompleted()事件后触发重订阅。 当.retry()接
到.onError()事件后触发重订阅。
然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen()和.retryWhen()的介入了,因为它们允许你为重试提供自定义逻辑。
Notification Handler
你可以通过一个叫做notificationHandler的函数来实现重试逻辑。这是.retryWhen()的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):
retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。
简化后,它包括三个部分:
Func1像个工厂类,用来实现你自己的重试逻辑。
输入的是一个Observable。
输出的是一个Observable
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors.flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable error) { // For IOExceptions, we retry if (error instanceof IOException) { return Observable.just(null); } // For anything else, don't retry return Observable.error(error); } }); } })
由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)触发重订阅或者.onError(error)来避免重订阅。
经验之谈
这里有一些关于.repeatWhen()和.retryWhen()的要点,我们应该牢记于心。
.repeatWhen()与.retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable。
每一次事件流的订阅notificationHandler(也就是Func1)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable,它能够发送任意数量的error。
输入的Observable必须作为输出Observable的源。你必须对Observable做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。
换言之就是,你不能做类似的操作:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return Observable.just(null);} })
因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors; } })
(顺便提一下,这在逻辑上与单纯使用.retry()操作符的效果是一样哒)
输入Observable只在终止事件发生的时候才会触发(对于.repeatWhen()来说是onCompleted,而对于.retryWhen()来说是onError)。它不会从源中接收到任何onNext的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()这样的操作符,来拦截事件流。
下面到了timer()、interval()、delay()
timer()
这里说的是新版本的timer(),而老版本的timer()已经跟interval()合并了。timer():创建一个Observable,它在一个给定的延迟后发射一个特殊的值
这里需要注意,定义里面说的是『一个』,所以有别于之前用的TimerTask。timer()只是用来创建一个Observable,并延迟发送一次的操作符,timer()并不会按周期执行。
interval()
interval():创建一个按固定时间间隔发射整数序列的Observable这个比较好理解,interval()也是用来创建Observable的,并且也可以延迟发送。但interval()是按周期执行的,所以可以这么认为:interval()是一个可以指定线程的TimerTask(威力加强版……)
delay()
delay():延迟一段指定的时间再发送来自Observable的发送结果语文没学好肯定读不懂这一段,我才看到这句话的时候也懵了……
其实delay()的常规使用跟timer()一致,那区别在哪呢?delay()是用于流中的操作,跟map()、flatMap()的级别是一样的。而timer()是用于创建Observable,跟just()、from()的级别是一样的。
总结
timer():用于创建Observable,延迟发送一次。interval():用于创建Observable,跟TimerTask类似,用于周期性发送。
delay():用于事件流中,可以延迟发送事件流中的某一次发送。
操作符BehaviorSubject
BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。看图![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/990692e359d30cff782c4adc4627569f.png)
如果遇到错误会直接中断
![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/3299bd5088b0c38d32555955e842a8de.png)
public void testMethodD() { //不同的订阅时机结果不一样哦 BehaviorSubject bs = BehaviorSubject.create(-1); bs.subscribe( new Action1<Integer>() { @Override public void call(Integer o) { Log.i(TAG, "call: num:" + o); } }); // 这里订阅回调-1, 1, 2, 3 bs.onNext(1); // 这里订阅回调1, 2, 3 bs.onNext(2); // 这里订阅回调2, 3 bs.onNext(3); // 这里订阅回调3 bs.onCompleted(); // 这里订阅没回调 }
02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:-1 02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:1 02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:2 02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:3
操作符zip
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。![](https://oscdn.geek-share.com/Uploads/Images/Content/202001/03/8f3f6520322ba3279625cdcc621b2aab.png)
Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。
Javadoc: zip(Iterable,FuncN))
Javadoc: zip(Observable,FuncN))
Javadoc: zip(Observable,Observable,Func2)) (最多可以有九个Observables参数)
Observable<Integer> observable1 = Observable.just(10,20,30); Observable<Integer> observable2 = Observable.just(4, 8, 12, 16); Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
运行结果如下:
Next:14
Next:28
Next:42
Sequence complete.
相关文章推荐
- RxJava处理网络连接失败和timer()、interval()、delay()之间的区别
- Android函数响应式编程——必学的RxJava创建操作符create、just、from、interval、range、repeat
- RxJava处理网络连接失败和timer()、interval()、delay()之间的区别
- RxJava处理网络连接失败和timer()、interval()、delay()之间的区别
- RxJava处理网络连接失败和timer()、interval()、delay()之间的区别
- RxJava【创建】操作符 create just from defer timer interval
- RxJava 创建操作符 timer与interval
- RxJava 创建操作符 timer与interval
- RxJava操作符repeatWhen()和retryWhen()
- RxJava学习篇之三:操作符的讲解(2)-Retry-Repeat
- Rxjava操作符(defer,compose,retryWhen)
- Rxjava1 timer(), delay(), interval()区别
- RX操作符之Observable的创建方式二(defer、range、interval、timer、Empty、Never、Throw)
- RX操作符之Observable的创建方式二(defer、range、interval、timer、Empty、Never、Throw)
- RxJava操作符repeatWhen()和retryWhen()
- RX系列三 | RxJava | create | from | interval | just | range | filter
- RxJava2 / RxAndroid2操作符interval:每隔若干时间发射信号
- Rxjava操作符zip
- 7.1Creating 创建操作 - Create/Defer/From/Just/Start/Repeat/Range
- RxJava操作符相关学习资料