迷之RxJava —— 线程切换
2016-05-19 15:17
591 查看
RxJava最迷人的是什么?
答案就是
把异步序列写到一个工作流里!和
javascript的
Promise/A如出一辙。
OK,在
java中做异步的事情在我们传统理解过来可不方便,而且,如果要让异步按照我们的工作流来,就更困难了。
但是在
RxJava中,我们只要调用调用
subscribOn()和
observeOn()就能切换我们的工作线程,是不是让小伙伴都惊呆了?
然后结合
RxJava的
Operator,写异步的时候,想切换线程就是一行代码的事情,整个
workflow还非常清晰:
Observable.create() // do something on io thread .work() // work.. work.. .subscribeOn(Schedulers.io()) // observeOn android main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe();
我们再也不用去写什么见鬼的
new Thread和
Handler了,在这么几行代码里,我们实现了在
io线程上做我们的工作(
work),在
main线程上,更新UI
Subscribe On
先看下subscribeOn干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return nest().lift(new OperatorSubscribeOn<T>(scheduler)); }
啊,原来也是个lift,就是从一个
Observable生成另外一个
Observable咯,这个
nest是干嘛用?
public final Observable<Observable<T>> nest() { return just(this); }
这里返回类型告诉我们,它是产生一个
Observable<Observable<T>>
讲到这里,会有点晕,先记着这个,然后我们看
OperatorSubscribeOn这个操作符,
构造函数是
public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; }
OK,这里保存了
scheduler对象,然后就是我们前一章说过的转换方法。
@Override public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { @Override public void onCompleted() { // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (Thread.currentThread() == t) { // don't schedule if we're already on the thread (primarily for first setProducer call) // see unit test 'testSetProducerSynchronousRequest' for more context on this producer.request(n); } else { inner.schedule(new Action0() { @Override public void call() { producer.request(n); } }); } } }); } }); } }); } }; }
让人纠结的类模板
看完这段又臭又长的,先深呼吸一口气,我们慢慢分析下。首先要注意
RxJava里面最让人头疼的模板问题,那么
OperatorMap这个类的声明是
public final class OperatorMap<T, R> implements Operator<R, T>
而
Operator这个接口继承
Func1
public interface Func1<T, R> extends Function { R call(T t); }
我们这里不要记
T和
R,记住
传入左边的模板是形参,传入右边的模板是返回值。
好了,那么这里的
call就是从一个
T转换成一个
Observable<T>的过程了。
总结一下,我们这一次调用
subscribeOn,做了两件事
1、
nest()为
Observable<T>生成了一个
Observable<Observable<T>>
2、
lift()对
Observalbe<Observalbe<T>>进行一个变化,变回
Observable<T>
因为
lift是一个模板函数,它的返回值的类型是参照它的形参来,而他的形参是
Operator<T, Observable<T>>这个结论非常重要!!
OK,到这里我们已经存储了所有的序列,等着我们调用了。
调用链
首先,记录我们在调用这条指令之前的Observable<T>,记为
Observable$1
然后,经过
lift生成的
Observable<T>记为
Observable$2
好了,现在我们拿到的依然是
Observable<T>这个对象,但是它不是原始的
Observable$1,要深深记住这一点,它是由
lift生成的
Observable$2,这时候进行
subscribe,那看到首先调用的就是
OnSubscribe.call方法,好,直接进入
lift当中生成的那个地方。
我们知道这一层
lift的
operator就是刚刚的
OperatorSubscribOn,那么调用它的
call方法,生成的是一个
Subscriber<Observable<T>>
Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { ... }
好,还记得我们调用过
nest么?,这里的
onSubscribe可是
nest上下文中的噢,每一次,到这个地方,这个
onSubscribe就是上一层
Observable的
onSubscribe,即
Observable<Observable<T>>的
onSubscribe,相当于栈弹出了一层。它的
call直接在
Subscriber的
onNext中给出了最开始的
Observable<T>,我们这里就要看下刚刚在
OperatorSubscribeOn中生成的
Subscriber
new Subscriber<Observable<T>>(subscriber) { @Override public void onCompleted() { // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } }); } }); } }
对,就是它,这里要注意,这里的
subscriber就是我们在
lift中,传入的
o
Subscriber<? super T> st = hook.onLift(operator).call(o);
对,就是它,其实它就是
SafeSubscriber。
回过头,看看刚刚的
onNext()方法,
inner.schedule()这个函数,我们可以认为就是
postRun()类似的方法,而
onNext()中传入的
o是我们之前生成的
Observable$1,是从
Observable.just封装出来的
Observable<Observable<T>>中产生的,这里调用了
Observable$1.unsafeSubscribe方法,我们暂时不关心它和
subscribe有什么不同,但是我们知道最终功能是一样的就好了。
注意它运行时的线程!!在
inner这个
Worker上!于是它的运行线程已经被改了!!
好,这里的
unsafeSubscribe调用的方法就是调用原先
Observable$1.onSubscribe中的
call方法:
这个
Observable$1就是我们之前自己定义的
Observable了。
综上所述,如果我们需要我们的
Observable$1在一个别的线程上运行的时候,只需要在后面跟一个
subscribeOn即可。结合扔物线大大的图如下:
总结
这里逻辑着实不好理解。如果还没有理解的朋友,可以按照我前文说的顺序,细致的看下来,我把逻辑过一遍之后,发现lift的陷阱实在太大,内部类用的风生水起,一不小心,就不知道一个变量的上下文是什么,需要特别小心。
之前我们分析过
subscribeOn这个函数,
现在我们来看下
subscribeOn和
observeOn这两个函数到底有什么异同。
用过
rxjava的旁友都知道,
subscribeOn和
observeOn都是用来切换线程用的,可是我什么时候用
subscribeOn,什么时候用
observeOn呢,我们很少知道这两个区别是啥。
友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。
subscribeOn
先看下OperatorSubscribeOn的核心代码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } .... }; source.unsafeSubscribe(s); } }); } }
这里注意两点:
因为
OperatorSubscribeOn是个
OnSubscribe对象,所以在
call参数中传入的
subscriber就是我们在外面使用
Observable.subscribe(a)传入的对象
a。
这里
source对象指向的是调用
subscribeOn之前的那个
Observable序列。
明确了这两点,我们就很好的知道了
subscribeOn是如何工作,产生神奇的效果了。
其实最最主要的就是一行函数
source.unsafeSubscribe(s);
并且要注意它所在的位置,是在worker的
call里面,说白了,就是把
source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:
subscribeOn的调用,改变了调用前序列所运行的线程。
observeOn
同样看下OperatorObserveOn这个类的主要代码:
public final class OperatorObserveOn<T> implements Operator<T, T> { private final Scheduler scheduler; private final boolean delayError; /** * @param scheduler the scheduler to use * @param delayError delay errors until all normal events are emitted in the other thread? */ public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this.scheduler = scheduler; this.delayError = delayError; } @Override public Subscriber<? super T> call(Subscriber<? super T> child) { .... ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError); parent.init(); return parent; } /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite<T> on; final boolean delayError; final Queue<Object> queue; // the status of the current stream volatile boolean finished; final AtomicLong requested = new AtomicLong(); final AtomicLong counter = new AtomicLong(); /** * The single exception if not null, should be written before setting finished (release) and read after * reading finished (acquire). */ Throwable error; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE); } else { queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE); } } void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<? super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); } @Override public void onStart() { // signal that this is an async operator capable of receiving this many request(RxRingBuffer.SIZE); } @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { if (isUnsubscribed() || finished) { return; } finished = true; schedule(); } @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } // only execute this from schedule() @Override public void call() { long emitted = 0L; long missed = 1L; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each RxRingBuffer.SIZE elements) for (;;) { long requestAmount = requested.get(); long currentEmission = 0L; while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; emitted++; } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } if (currentEmission != 0L) { BackpressureUtils.produced(requested, currentEmission); } missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } if (emitted != 0L) { request(emitted); } } boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) { if (a.isUnsubscribed()) { q.clear(); return true; } if (done) { if (delayError) { if (isEmpty) { Throwable e = error; try { if (e != null) { a.onError(e); } else { a.onCompleted(); } } finally { recursiveScheduler.unsubscribe(); } } } else { Throwable e = error; if (e != null) { q.clear(); try { a.onError(e); } finally { recursiveScheduler.unsubscribe(); } return true; } else if (isEmpty) { try { a.onCompleted(); } finally { recursiveScheduler.unsubscribe(); } return true; } } } return false; } } }
这里的代码有点长,我们先注意到它是一个
Operator,它没有对上层
Observable做任何的控制或者包装。
既然是
Operator,那么它的职责就是把一个
Subscriber转换成另外一个
Subscriber,
我们来关注下转换后的
Subscriber对转换前的
Subscriber做了些什么事。
首先它是一个
ObserveOnSubscriber类, 既然是
Subscriber那么肯定有
onNext,
onComplete和
onError看最主要的onNext
@Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); }
好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用
schedule启动传入的
worker
我们这里需要注意下:
在调用
observeOn前的序列,把结果传入到
onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给
ObserveOnSubscriber继续。
protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } }
recursiveScheduler就是之前我们传入的Scheduler,我们一般会在
observeOn传入
AndroidScheluders.mainThread()对吧、
接下去,我们看下在
scheduler中调用的
call方法,这里只列出主要带代码
@Override public void call() { ... final Subscriber<? super T> localChild = this.child; for (;;) { ... boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); ... } if (emitted != 0L) { request(emitted); } }
OK,在
Scheduler启动后, 我们在
Observable.subscribe(a)传入的
a就是这里的
child, 我们看到,在
call中终于调用了它的
onNext方法,把真正的结果传了出去,但是在这里,我们是工作在
observeOn的线程上的。
那么总结起来的结论就是:
observeOn对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
observeOn对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的
subscriber
复杂情况
我们经常多次使用subscribeOn切换线程,那么以后是否可以组合
observeOn和
subscribeOn达到自由切换的目的呢?
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道
observeOn调用之后,再调用
subscribeOn是无效的,原因是什么?
因为
subscribeOn改变的是
subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,
observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算
subscribeOn调用了,也只是改变
observeOn这个消费者所在的线程,和
OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由
observeOn控制。
总结
如果我们有一段这样的序列Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io) .map //操作3 .flatMap //操作4 .observeOn(main) .map //操作5 .flatMap //操作6 .subscribeOn(io) //!!特别注意 .subscribe(handleData)
假设这里我们是在主线程上调用这段代码,
那么
操作1,
操作2是在io线程上,因为之后
subscribeOn切换了线程
操作3,
操作4也是在io线程上,因为在
subscribeOn切换了线程之后,并没有发生改变。
操作5,
操作6是在main线程上,因为在他们之前的
observeOn切换了线程。
特别注意那一段,对于
操作5和
操作6是无效的
再简单点总结就是
subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用
subscribeOn切换线程
=========
续 特别感谢@扔物线给的额外的总结
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件
只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)
这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()
observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程
相关文章推荐
- 底层解惑-spring的IOC相关接口:BeanFactory与FactoryBean
- Java中关于OOM的场景及解决方法
- zookeeper搭建以及Java连接zookeeper测试
- java数值范围以及float与double精度丢失问题
- Hibernate3使用注解出现 javax/persistence/Entity找不到的错误
- Hibernate中的单向多对一关联
- java代码获取服务器的地址
- [转]struts1.2的action参数配置
- java中 0xff的意义
- java-类加载器
- Java.net.PlainSocketImpl
- 深入浅出RxJava三--响应式的好处
- spring mvc 1
- Java System类看到的一点小记
- java基础第九天总结
- JAVA获取时间戳,哪个更快
- 深入浅出RxJava(二:操作符)
- 泛型类简单实例
- 工作笔记---js和java共同完成大写锁定提示
- 使用maven创建基于spring框架的scala web工程