您的位置:首页 > 编程语言 > Java开发

RxJava学习篇之三:操作符的讲解(2)-Retry-Repeat

2016-12-08 11:09 411 查看
今天RxJava操作的符的主角就是Repeat和Retry

Repeat:创建一个发射特定数据重复多次的Observable

1,repeat有四个重载:

repeat()  无限重复发射原始的Observable

repeat(long) 指定重复发射的次数 

repeat(Scheduler) 指定重复发射线程,repeat默认在trampoline上执行.

repeat(long,Scheduler) 指定重复发射的次数和线程。

例子:
private void testRepeat() {
Observable.just(1, 2, 3)
.repeat(3,Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}
输出:
12-08 03:40:31.431 7083-7101/? D/TAG: onNext:1
12-08 03:40:31.431 7083-7101/? D/TAG: onNext:2
12-08 03:40:31.431 7083-7101/? D/TAG: onNext:3
12-08 03:40:31.439 7083-7101/? D/TAG: onNext:1
12-08 03:40:31.439 7083-7101/? D/TAG: onNext:2
12-08 03:40:31.439 7083-7101/? D/TAG: onNext:3
12-08 03:40:31.439 7083-7101/? D/TAG: onNext:1
12-08 03:40:31.440 7083-7101/? D/TAG: onNext:2
12-08 03:40:31.440 7083-7101/? D/TAG: onNext:3
12-08 03:40:31.440 7083-7101/? D/TAG: Sequence complete.

2,repeatWhen有2个重载

repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) 不指定线程

repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) 指定线程

的Observable。

由于repeatWhen不repeat操作符更加重用,这里我详细说一下这个操作符:

1,Func1一个工厂类,第一个泛型是输入的,第二个是输出的泛型

2,[可选]指定执行的线程,默认trampoline上执行

注意:

a)将原始Observable的Complete通知当做一个 void  数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的Observable。这个通知处理器就像一个Observable操作符,接受一个发射 void  通知的Observable为输入,被返回的
Observable<?>
所要发送的事件决定了重订阅是否会发生。如果发送的是
onCompleted
或者
onError
事件,将不会触发重订阅。相对的,如果它发送
onNext
事件,则触发重订阅(不管
onNext
实际上是什么事件)。这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next,
error或者completed),一个很重要的通知而已。

b)当订阅发生的时候,工厂
Func1
被调用,而且只会被调用一次,从而准备重试逻辑。那样的话,当
onComplete
被调用后,你已经定义的重试逻辑就能够处理它了。

那上面的是什么意思呢?

private void testRepeatWhen() {
Observable.just(1, 2)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.d("TAG", "repeat Func1");
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
return Observable.error(new Exception());
}
});
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onNext(Object item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}
输出:
12-08 05:41:44.994 9413-9413/tbw.eage.rxjava D/TAG: repeat Func1
12-08 05:41:44.997 9413-9413/tbw.eage.rxjava D/TAG: onNext:1
12-08 05:41:44.997 9413-9413/tbw.eage.rxjava D/TAG: onNext:2
12-08 05:41:45.005 9413-9413/tbw.eage.rxjava D/TAG: onErr:null
由于返回的Observable.err()并没有发生重新订阅操作,并且执行onError()方法。
再看看返回Observable.empty().

private void testRepeatWhen() {
Observable.just(1, 2)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.d("TAG", "repeat Func1");
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
return Observable.empty();
}
});
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onNext(Object item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}
输出:
12-08 05:47:52.679 9413-9413/tbw.eage.rxjava D/TAG: repeat Func1
12-08 05:47:52.679 9413-9413/tbw.eage.rxjava D/TAG: onNext:1
12-08 05:47:52.679 9413-9413/tbw.eage.rxjava D/TAG: onNext:2
由于返回的Observable.empty()并没有发生重新订阅操作,没有执行onComplted()方法。
都在没有发生重新订阅操作下输出了repeat Func1,而且是在onNext()方法输出之前就输出了repeat Func1,这就说明Func1是在订阅(Subscribe)是被调用。

c)你必须基础传入的Observable做出反应,而不能随意的返回一个Observable。

private void testRepeatWhen() {
Observable.just(1, 2)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.d("TAG", "repeat Func1");
return Observable.just("abc");
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onNext(Object item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}

输出:

12-08 05:56:54.628 9413-9413/tbw.eage.rxjava D/TAG: repeat Func1
12-08 05:56:54.628 9413-9413/tbw.eage.rxjava D/TAG: Sequence complete.我没有基于传入的Observable做出反应,也没有发生订阅。
正确的使用方式应该是:

private void testRepeatWhen() {
Observable.just(1, 2)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.d("TAG", "repeat Func1");
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
return Observable.timer(5,TimeUnit.SECONDS);
}
});
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onNext(Object item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}
输出:

12-08 06:06:18.717 9914-9914/tbw.eage.rxjava D/TAG: repeat Func1
12-08 06:06:18.718 9914-9914/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:06:18.718 9914-9914/tbw.eage.rxjava D/TAG: onNext:2
12-08 06:06:23.721 9914-9975/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:06:23.721 9914-9975/tbw.eage.rxjava D/TAG: onNext:2
12-08 06:06:28.722 9914-9975/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:06:28.722 9914-9975/tbw.eage.rxjava D/TAG: onNext:2
12-08 06:06:33.723 9914-9975/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:06:33.723 9914-9975/tbw.eage.rxjava D/TAG: onNext:2
进行了重新订阅,并且每隔5s进行一次。
看到这,有小伙伴就会发出疑问了,timer方法为什么还要写flatMap操作符,直接这样写不就的了:

.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.d("TAG", "repeat Func1");
return observable.timer(5,TimeUnit.SECONDS);
}
})这还不是基于传入的Observable做出了反应,事实上你会发现并没有进行重新订阅,因为timer是一个static方法,与传入的Observable并没有什么直接的关系。用delay()到时可以的。
Retry:如果原始Observable遇到错误,重新订阅它期望它能正常终止

1,retry有三个重载:

retry()   无论收到多少个错误,retry总是传递onNext通知给观察者

retry(long)  指定收到错误的个数,超过指定的个数,它就会把最新的一个onError通知传给他的观察者

retry(Func2)    接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射 onError  通知的 Throwable  。这个函数返回一个布尔值,如果返回 true  , retry  应该再次订阅和镜像原始的Observable,如果返回 false  , retry  会将最新的一个 onError  通知传递给它的观察者。

retry 默认在trampoline上执行

retry  操作符不会将原始Observable的 onError  通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。 Retry  总是传递 onNext  通知给观察者,由于重新订阅,可能会造成数据项重复.

我们看看这个例子:

int num = 0;
private void testRetry() {
Observable.just(1, 2, 3, 4, 5)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if ((integer & 1) != 1 && num < 2) {
num++;
return Observable.error(new Exception());
}
return Observable.just(integer);
}
})
.retry()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}输出:
12-08 06:37:05.706 11062-11062/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:37:05.707 11062-11062/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:37:05.707 11062-11062/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:37:05.708 11062-11062/tbw.eage.rxjava D/TAG: onNext:2
12-08 06:37:05.708 11062-11062/tbw.eage.rxjava D/TAG: onNext:3
12-08 06:37:05.709 11062-11062/tbw.eage.rxjava D/TAG: onNext:4
12-08 06:37:05.720 11062-11062/tbw.eage.rxjava D/TAG: onNext:5
12-08 06:37:05.720 11062-11062/tbw.eage.rxjava D/TAG: Sequence complete.
这个例子并没有什么实际意义,只是模拟而已。当输出的数据是偶数并且num<2就生成异常。retry只要有异常就会重复订阅直到num>=2,没有生成异常为止,所以产生了重复数据。
如果把retry改成:

.retry(1)输出:
12-08 06:42:22.016 11203-11203/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:42:22.016 11203-11203/tbw.eage.rxjava D/TAG: onNext:1
12-08 06:42:22.016 11203-11203/tbw.eage.rxjava D/TAG: onErr:null并没有完成整个输出就onError(),因为retry次数不够。
看看retry(Func2)的用法:

private void testRetry() {
Observable.just(1, 2, 3, 4, 5)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if ((integer & 1) != 1 && num < 3) {
num++;
return Observable.error(new NullPointerException());
}
return Observable.just(integer);
}
})
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
if (integer < 4 && throwable instanceof NullPointerException) {
return true;
}
return false;
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}当重复次数少于4并且异常是NullPointer异常时就重新订阅。
2,retryWhen有2个重载

retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler)不指定线程

retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler)  指定线程

注意的和上面的repeatWhen一样,就不再说明了,只是repeat接受的是completed信号,而retry接受的是error信号。

这里看看retryWhen怎么使用:

int num = 0;
private void testRetryWhen() {
Observable.just(1, 2, 3, 4, 5)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if ((integer & 1) != 1 && num < 3) {
num++;
return Observable.error(new NullPointerException());
}
return Observable.just(integer);
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.delay(5, TimeUnit.SECONDS);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
Log.d("TAG", "onNext:" + item);
}

@Override
public void onError(Throwable error) {
Log.d("TAG", "onErr:" + error.getMessage());
}

@Override
public void onCompleted() {
Log.d("TAG", "Sequence complete.");
}
});
}输出:
12-08 07:10:28.639 12894-12894/tbw.eage.rxjava D/TAG: onNext:1
12-08 07:10:33.641 12894-12937/tbw.eage.rxjava D/TAG: onNext:1
12-08 07:10:38.642 12894-12937/tbw.eage.rxjava D/TAG: onNext:1
12-08 07:10:43.643 12894-12937/tbw.eage.rxjava D/TAG: onNext:1
12-08 07:10:43.643 12894-12937/tbw.eage.rxjava D/TAG: onNext:2
12-08 07:10:43.643 12894-12937/tbw.eage.rxjava D/TAG: onNext:3
12-08 07:10:43.643 12894-12937/tbw.eage.rxjava D/TAG: onNext:4
12-08 07:10:43.643 12894-12937/tbw.eage.rxjava D/TAG: onNext:5
12-08 07:10:48.643 12894-12937/tbw.eage.rxjava D/TAG: Sequence complete.结果其实和单纯的使用retry是一样的。只是增加了一个延时而已。
那如果我想使用retryWhen控制订阅次数怎么办呢?

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 4), new Func2<Throwable, Integer, Object>() {
@Override
public Object call(Throwable throwable, Integer integer) {
return integer;
}
});
}
})控制订阅重复次数为4次。4次没有成功就completed.

retryWhen 和 repeatWhen在轮询和重连的场景使用还是比较多的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: