您的位置:首页 > 理论基础 > 计算机网络

RxJava与网络相关的操作符(range/defer/retry/repeat/timer/delay/interval/BehaviorSubject/zip)

2017-02-22 18:11 597 查看
工欲善其事必先利其器。不多废话直接看重点。

操作符range

range操作符的作用Range操作符根据出入的初始值n和数目m发射一系列大于等于n的m个值。

public class MainActivity extends AppCompatActivity {

private Button btn;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
rangeTest();
}
});
}

/**
* 测试range操作符
*/
private void rangeTest() {
Observable.range(23, 3).subscribe(new Subscriber<Integer>() {
public void onNext(Integer value) {
Log.e("rangeObserver", String.valueOf(value));
}

@Override
public void onCompleted() {
Log.e("rangeObserver", "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("rangeObserver", "onError");
}
});
}
}


输出

05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 23
05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 24
05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: 25
05-17 19:58:09.629 14460-14460/com.robot.rxjavaandretrofit E/rangeObserver: onCompleted


注意:range操作符的不用主动调用onNext()和onCompleted(),它里面已经实现了对Subcriber的onNext()和onCompleted()的调用。

操作符 defer

defer 是创建型的操作符,字面上有「推迟」的意思,推迟创建数据流的规则是:一开始不会马上创建 Observable,直到有订阅者订阅时才会创建,且每次都创建全新的 Observable



操作符:这个长框内有很多数据流,要表达的含义是:每次都创建全新的数据流 Observable 。

输入:图中产生了两条全新的数据流,且发送的数据可能不一样(弹珠颜色不一样)

输出:创建型的操作符基本上都没有输出的图示,根据对操作符的大概理解,为了验证输入,需要订阅两次。

实现思路:defer 在每次产生 Observable 时,都保存起来,最终验证这些数据流不会相等。

操作符repeat

repeat操作符就是对某一个Observable重复产生多次结果,当repeat() 接收到onComplete()会触发重订阅,默认情况下运行在一个新的线程上.

Observable.range(1, 5).repeat(5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
LogUtils.d("-------->" + integer);
}
});


输出:

这里重复执行了5次,打印结果:
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->1
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->2
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->3
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->4
02-20 09:31:30.403 4824-4824/com.rxandroid.test1 D/----->: -------->5


操作符retry

retry 和 retryWhen 是错误处理型的操作符,当数据流发送了错误的数据时,将根据既定的规则发起重新订阅。



有了之前的铺垫,实现这张弹珠图并不复杂:数据流第一次发送了一个 Error 数据,retry 执行,订阅者重新发起订阅,数据流第二次发送正常的数据。具体代码实现如下:

@Test
public void retry() {

final Integer[] arrays = {0};

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3 / arrays[0]++);
subscriber.onCompleted();
}
})
.retry()
.subscribe(mList::add);

assertEquals(mList, Arrays.asList(1, 2, 1, 2, 3));
}


操作符retryWhen

retry 只是小试牛刀,接下来看看 retryWhen。



这张图很难理解,既有错误重试,还有延时策略,实在无从下手,我们需要查阅更多的文章,幸运是刚刚 defer 篇的那位作者写了相关的另外一篇文章 RxJava’s repeatWhen and retryWhen, explained,也有相应的译文 。仔细阅读之后,梳理下 retryWhen 的套路,当错误重试需要延时策略时,实现流程大概是这样子的:



@Test
public void retryWhen_flatMap_timer() {

Observable.create(subscriber -> {
System.out.println("subscribing");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new RuntimeException("RuntimeException"));
})
.retryWhen(observable ->
observable.flatMap(
(Func1<Throwable, Observable<?>>) throwable ->
//延迟5s重新订阅
Observable.timer(5, TimeUnit.SECONDS, mTestScheduler)
)
)
.subscribe(num -> {
System.out.println(num);
mList.add(num);
});

//时间提前10s,将发生1次订阅+2次重新订阅
mTestScheduler.advanceTimeBy(10, TimeUnit.SECONDS);

assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2));
}


破坏数据流

如果 retryWhen 的输入 Observable ,被粗暴的直接返回一个普通的数据流,则链式结构将被打断,如下代码:

@Test
public void retryWhen_break_sequence() {

// 错误的做法:破坏数据流,打断链式结构
Observable.just(1, 2, 3)
.retryWhen(throwableObservable -> Observable.just(1, 1, 1))
.subscribe(mList::add);
//数据流被打断,订阅不到数据
assertTrue(mList.isEmpty());

// 正确的做法:至少将throwableObservable作为返回结果,此时的retryWhen()等价于retry()
Observable.just(1, 2, 3)
.retryWhen(throwableObservable -> throwableObservable).
subscribe(mList::add);
//此处的数据流不会触发error,因此正常输出1,2,3的数列
assertEquals(mList, Arrays.asList(1, 2, 3));
}


限制次数的延时错误重试

当数据流产生错误的数据时,会触发 retryWhen,并输入 Observable error 。

将 Observable error 与 Observable.range(1, 3) 做 zip 聚合,range 作为创建型的操作符,将产生 1,2,3 的数据流,因此前3次 error 将会正常配对并调用 onCompleted(),不再接收第四次的 error。

具体的代码实现如下:

@Test
public void retryWhen_zip_range_timer() {

Observable.create((Subscriber<? super Integer> subscriber) -> {
System.out.println("subscribing");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new RuntimeException("always fails"));
})
.retryWhen(observable ->
observable.zipWith(
Observable.range(1, 3),
(Func2<Throwable, Integer, Integer>) (throwable, num) -> num
)
.flatMap((Func1<Integer, Observable<?>>) num -> {
System.out.println("delay retry by " + num + " second(s)");
return Observable.timer(num, TimeUnit.SECONDS);
}))
.doOnNext(System.out::println)
.doOnCompleted(() -> System.out.println("completed"))
.toBlocking()
.forEach(mList::add);

//正常订阅一次,重新订阅3次
assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2));
}


Repeat与Retry的对比

首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。

当.repeat()接收到.onCompleted()事件后触发重订阅。 当.retry()接

到.onError()事件后触发重订阅。

然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen()和.retryWhen()的介入了,因为它们允许你为重试提供自定义逻辑。

Notification Handler

你可以通过一个叫做notificationHandler的函数来实现重试逻辑。这是.retryWhen()的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)


签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。

简化后,它包括三个部分:

Func1像个工厂类,用来实现你自己的重试逻辑。

输入的是一个Observable。

输出的是一个Observable

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {

// For IOExceptions, we  retry
if (error instanceof IOException) {
return Observable.just(null);
}

// For anything else, don't retry
return Observable.error(error);
}
});
}
})


由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)触发重订阅或者.onError(error)来避免重订阅。

经验之谈

这里有一些关于.repeatWhen()和.retryWhen()的要点,我们应该牢记于心。

.repeatWhen()与.retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable。

每一次事件流的订阅notificationHandler(也就是Func1)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable,它能够发送任意数量的error。

输入的Observable必须作为输出Observable的源。你必须对Observable做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。

换言之就是,你不能做类似的操作:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return Observable.just(null);}
})


因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {

return errors;
}
})


(顺便提一下,这在逻辑上与单纯使用.retry()操作符的效果是一样哒)

输入Observable只在终止事件发生的时候才会触发(对于.repeatWhen()来说是onCompleted,而对于.retryWhen()来说是onError)。它不会从源中接收到任何onNext的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()这样的操作符,来拦截事件流。

下面到了timer()、interval()、delay()

timer()

这里说的是新版本的timer(),而老版本的timer()已经跟interval()合并了。

timer():创建一个Observable,它在一个给定的延迟后发射一个特殊的值

这里需要注意,定义里面说的是『一个』,所以有别于之前用的TimerTask。timer()只是用来创建一个Observable,并延迟发送一次的操作符,timer()并不会按周期执行。

interval()

interval():创建一个按固定时间间隔发射整数序列的Observable

这个比较好理解,interval()也是用来创建Observable的,并且也可以延迟发送。但interval()是按周期执行的,所以可以这么认为:interval()是一个可以指定线程的TimerTask(威力加强版……)

delay()

delay():延迟一段指定的时间再发送来自Observable的发送结果

语文没学好肯定读不懂这一段,我才看到这句话的时候也懵了……

其实delay()的常规使用跟timer()一致,那区别在哪呢?delay()是用于流中的操作,跟map()、flatMap()的级别是一样的。而timer()是用于创建Observable,跟just()、from()的级别是一样的。

总结

timer():用于创建Observable,延迟发送一次。

interval():用于创建Observable,跟TimerTask类似,用于周期性发送。

delay():用于事件流中,可以延迟发送事件流中的某一次发送。

操作符BehaviorSubject

BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。看图



如果遇到错误会直接中断



public void testMethodD() {
//不同的订阅时机结果不一样哦
BehaviorSubject bs = BehaviorSubject.create(-1);
bs.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer o) {
Log.i(TAG, "call: num:" + o);
}
});
// 这里订阅回调-1, 1, 2, 3
bs.onNext(1);
// 这里订阅回调1, 2, 3
bs.onNext(2);
// 这里订阅回调2, 3
bs.onNext(3);
// 这里订阅回调3
bs.onCompleted();
// 这里订阅没回调
}


02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:-1
02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:1
02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:2
02-23 17:30:05.186 18787-18787/com.rengwuxian.rxjavasamples I/TokenAdvancedFragment: call: num:3


操作符zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。



Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。

Javadoc: zip(Iterable,FuncN))

Javadoc: zip(Observable,FuncN))

Javadoc: zip(Observable,Observable,Func2)) (最多可以有九个Observables参数)

Observable<Integer> observable1 = Observable.just(10,20,30);
Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


运行结果如下:

Next:14

Next:28

Next:42

Sequence complete.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: