我的RxJava入门(三)
2017-07-13 13:33
495 查看
上一篇 我的RxJava入门(二)
参考文档http://gank.io/post/560e15be2dca930e00da1083
这篇主要是讲解变换lift()的原理
lift()的核心代码:
(1)subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象.
(2)当含有 lift() 时:
1).lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了; 同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
2).当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
3).而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
实质:针对事件序列的处理和再发送。在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
执行原理类似于递归操作,先从最上层到最下层,然后最下层逐层反馈给最上层,最上层在逐步通知最下层,最后传递给目标subscribe。
一次lift()
多次lift()
示例代码
note:RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。
compose: 对 Observable 整体的变换
区别:lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。
举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换
使用compose()实现
像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。
利用 subscribeOn() 结合
4000
observeOn() 来实现线程控制
答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时那一刻 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。
通过 observeOn() 的多次调用,程序实现了线程的多次切换。
subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
解析
subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;
observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
subscribeOn() 和 observeOn() 混合使用时,线程调度情况
图中共有 5 处含有对事件的操作。
①和②两处受第一个 subscribeOn() 影响,运行在红色线程;
③和④处受第一个 observeOn() 的影响,运行在绿色线程;
⑤处受第二个 onserveOn() 影响,运行在紫色线程;
而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
OnStart()和doOnSubscribe()的区别
(1)Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码,将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。
(2)Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
示例代码:
在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。
使用场景
1.与 Retrofit(retrofit 是 Square 的一个著名的网络请求库)的结合;
2.RxBinding;
3.各种异步操作,而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现。
4.RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。
参考文档http://gank.io/post/560e15be2dca930e00da1083
这篇主要是讲解变换lift()的原理
lift()的核心代码:
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }
(1)subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象.
(2)当含有 lift() 时:
1).lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了; 同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
2).当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
3).而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
实质:针对事件序列的处理和再发送。在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
执行原理类似于递归操作,先从最上层到最下层,然后最下层逐层反馈给最上层,最上层在逐步通知最下层,最后传递给目标subscribe。
一次lift()
多次lift()
示例代码
observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 将事件序列中的 Integer 对象转换为 String 对象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } });
note:RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。
compose: 对 Observable 整体的变换
区别:lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。
举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换
observable1 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1); observable2 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber2); observable3 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber3); observable4 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1); //or private Observable liftAll(Observable observable) { return observable .lift1() .lift2() .lift3() .lift4(); } ... liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4);
使用compose()实现
public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable .lift1() .lift2() .lift3() .lift4(); } } ... Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4);
像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。
利用 subscribeOn() 结合
4000
observeOn() 来实现线程控制
让事件的产生和消费发生在不同的线程。
能不能多切换几次线程?答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时那一刻 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。
通过 observeOn() 的多次调用,程序实现了线程的多次切换。
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):
subscribeOn() 原理图:
observeOn() 原理图:
解析
subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;
observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
subscribeOn() 和 observeOn() 混合使用时,线程调度情况
图中共有 5 处含有对事件的操作。
①和②两处受第一个 subscribeOn() 影响,运行在红色线程;
③和④处受第一个 observeOn() 的影响,运行在绿色线程;
⑤处受第二个 onserveOn() 影响,运行在紫色线程;
而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
OnStart()和doOnSubscribe()的区别
(1)Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码,将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。
(2)Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
示例代码:
在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
使用场景
1.与 Retrofit(retrofit 是 Square 的一个著名的网络请求库)的结合;
2.RxBinding;
3.各种异步操作,而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现。
4.RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。
相关文章推荐
- RxJava入门学习------①操作符
- RxJava 入门
- Rxjava使用入门
- RxJava 从入门到出轨
- RxJava 入门 慕课网
- RxJava/RxAndroid之快速入门3(转)
- RxJava 从入门到爱上它 - "变换"的深入理解
- RxJava入门系列二,操作符篇
- Android:手把手带你入门神秘的 Rxjava
- RXJava快速入门
- RxJava入门(一)
- RxJava1.x从入门到放弃再到RxJava 2.x(一)
- [置顶] RxJava 从入门到出轨
- Android:入门神秘Rxjava(本文主要基于Rxjava 2.0)
- RxJava入门使用
- 这可能是最好的RxJava 2.x 入门教程(三)
- RxJava 从入门到出轨
- RxJava 入门
- Rxjava2-小白入门(一)
- Android :Rxjava 清晰 & 易懂的 入门教程