您的位置:首页 > 编程语言 > Java开发

RxJava 学习笔记(九) --- Error Handling 错误处理操作

2016-07-15 17:54 453 查看
onErrorReturn 指示Observable在遇到错误时发射一个特定的数据

onErrorResumeNext 指示Observable在遇到错误时发射一个数据序列

onExceptionResumeNext 指示Observable遇到错误时继续发射数据

retry 指示Observable遇到错误时重试

retryWhen 指示Observable遇到错误时将错误传递给另一个Observable来决定是否要重新给订阅这个Observable

1.
onErrorReturn
—> 指示Observable在遇到错误时发射一个特定的数据

onErrorReturn
方法返回一个镜像原有
Observable
行为的新
Observable
,后者会忽略前者的
onError
调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的
onCompleted
方法。



示例代码:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循环输出数字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});

observable.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 1004;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


输出:

Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.


Javadoc:onErrorReturn(Func1)

2.
onErrorResumeNext
—> 指示Observable在遇到错误时发射一个数据序列



onErrorResumeNext
方法返回一个镜像原有
Observable
行为的新
Observable
,后者会忽略前者的
onError
调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的
Observable


示例代码:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循环输出数字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});

observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(100,101, 102);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


输出:

Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.


Javadoc:onErrorResumeNext(Func1)

Javadoc:onErrorResumeNext(Observable)

3.
onExceptionResumeNext
—> 指示Observable遇到错误时继续发射数据

onErrorResumeNext
类似,
onExceptionResumeNext
方法返回一个镜像原有
Observable
行为的新
Observable
,也使用一个备用的
Observable
,不同的是,如果
onError
收到的
Throwable
不是一个
Exception
,它会将错误传递给观察者的
onError
方法,不会使用备用的
Observable




示例代码:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循环输出数字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});

observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


输出:

Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.


Javadoc:onExceptionResumeNext(Observable)

4.
retry
—> 指示Observable遇到错误时重试

如果原始
Observable
遇到错误,重新订阅它期望它能正常终止



Retry
操作符不会将原始
Observable
onError
通知传递给观察者,它会订阅这个
Observable
,再给它一次机会无错误地完成它的数据序列。
Retry
总是传递
onNext
通知给观察者,由于重新订阅,可能会造成数据项重复,如上图所示。

RxJava
中的实现为
retry
retryWhen


无论收到多少次
onError
通知,无参数版本的
retry
都会继续订阅并发射原始
Observable


接受单个
count
参数的
retry
会最多重新订阅指定的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个
onError
通知传递给它的观察者。

还有一个版本的
retry
接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射
onError
通知的
Throwable
。这个函数返回一个布尔值,如果返回
true
retry
应该再次订阅和镜像原始的
Observable
,如果返回
false
retry
会将最新的一个
onError
通知传递给它的观察者。

retry
操作符默认在
trampoline
调度器上执行。

示例代码:
retry(long)


Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循环输出数字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});

observable.retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


输出:

Next:0
Next:1
Next:2
Next:3

Next:0
Next:1
Next:2
Next:3

Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!


示例代码:
retry(Func2)


Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循环输出数字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});

observable.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
return integer < 3;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next--:" + value);
}
});


输出:

Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Error: this is number 4 error!


Javadoc:retry()

Javadoc:retry(long)

Javadoc:retry(Func2)

5.
retryWhen
—> 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable

retryWhen
retry
类似,区别是,
retryWhen
onError
中的
Throwable
传递给一个函数,这个函数产生另一个
Observable
retryWhen
观察它的结果再决定是不是要重新订阅原始的
Observable
。如果这个
Observable
发射了一项数据,它就重新订阅,如果这个
Observable
发射的是
onError
通知,它就将这个通知传递给观察者然后终止。

retryWhen
默认在
trampoline
调度器上执行,你可以通过参数指定其它的调度器。



示例代码:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("subscribing");
subscriber.onError(new RuntimeException("always fails"));
}
});

observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
System.out.println("delay retry by " + integer + " second(s)");
//每一秒中执行一次
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


输出:

subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
Sequence complete.


Javadoc:retryWhen(Func1)

Javadoc:retryWhen(Func1,Scheduler)

可以参考 repeatWhen 和 retryWhen 操纵符 的思考, 第一篇是原文,第二篇是译文

RxJava’s repeatWhen and retryWhen, explained

【译】对RxJava中.repeatWhen()和.retryWhen()操作符的思考
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: