您的位置:首页 > 编程语言 > Java开发

迷之RxJava —— 线程切换

2016-05-19 15:17 591 查看
RxJava
最迷人的是什么?

答案就是
把异步序列写到一个工作流里!
javascript
Promise/A
如出一辙。

OK,在
java
中做异步的事情在我们传统理解过来可不方便,而且,如果要让异步按照我们的工作流来,就更困难了。

但是在
RxJava
中,我们只要调用调用
subscribOn()
observeOn()
就能切换我们的工作线程,是不是让小伙伴都惊呆了?

然后结合
RxJava
Operator
,写异步的时候,想切换线程就是一行代码的事情,整个
workflow
还非常清晰:
Observable.create()
// do something on io thread
.work() // work.. work..
.subscribeOn(Schedulers.io())
// observeOn android main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe();


我们再也不用去写什么见鬼的
new Thread
Handler
了,在这么几行代码里,我们实现了在
io
线程上做我们的工作(
work
),在
main
线程上,更新UI


Subscribe On

先看下
subscribeOn
干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}


啊,原来也是个lift,就是从一个
Observable
生成另外一个
Observable
咯,这个
nest
是干嘛用?
public final Observable<Observable<T>> nest() {
return just(this);
}


这里返回类型告诉我们,它是产生一个
Observable<Observable<T>>


讲到这里,会有点晕,先记着这个,然后我们看
OperatorSubscribeOn
这个操作符,

构造函数是
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}


OK,这里保存了
scheduler
对象,然后就是我们前一章说过的转换方法。
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
}
}

});
}

});
}
});
}

};
}


让人纠结的类模板

看完这段又臭又长的,先深呼吸一口气,我们慢慢分析下。

首先要注意
RxJava
里面最让人头疼的模板问题,那么
OperatorMap
这个类的声明是
public final class OperatorMap<T, R> implements Operator<R, T>


Operator
这个接口继承
Func1

public interface Func1<T, R> extends Function {
R call(T t);
}


我们这里不要记
T
R
,记住
传入左边的模板是形参,传入右边的模板是返回值


好了,那么这里的
call
就是从一个
T
转换成一个
Observable<T>
的过程了。

总结一下,我们这一次调用
subscribeOn
,做了两件事

1、
nest()
 为
Observable<T>
生成了一个
Observable<Observable<T>>
 

2、
lift()
 对
Observalbe<Observalbe<T>>
进行一个变化,变回
Observable<T>


因为
lift
是一个模板函数,它的返回值的类型是参照它的形参来,而他的形参是
Operator<T,
Observable<T>>
 这个结论非常重要!!

OK,到这里我们已经存储了所有的序列,等着我们调用了。


调用链

首先,记录我们在调用这条指令之前的
Observable<T>
,记为
Observable$1


然后,经过
lift
生成的
Observable<T>
记为
Observable$2


好了,现在我们拿到的依然是
Observable<T>
这个对象,但是它不是原始的
Observable$1
,要深深记住这一点,它是由
lift
生成的
Observable$2
,这时候进行
subscribe
,那看到首先调用的就是
OnSubscribe.call
方法,好,直接进入
lift
当中生成的那个地方。

我们知道这一层
lift
operator
就是刚刚的
OperatorSubscribOn
,那么调用它的
call
方法,生成的是一个
Subscriber<Observable<T>>

Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
...
}


好,还记得我们调用过
nest
么?,这里的
onSubscribe
可是
nest
上下文中的噢,每一次,到这个地方,这个
onSubscribe
就是上一层
Observable
onSubscribe
,即
Observable<Observable<T>>
onSubscribe
,相当于栈弹出了一层。它的
call
直接在
Subscriber
onNext
中给出了最开始的
Observable<T>
,我们这里就要看下刚刚在
OperatorSubscribeOn
中生成的
Subscriber

new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
});
}
}


对,就是它,这里要注意,这里的
subscriber
就是我们在
lift
中,传入的
o

Subscriber<? super T> st = hook.onLift(operator).call(o);


对,就是它,其实它就是
SafeSubscriber


回过头,看看刚刚的
onNext()
方法,
inner.schedule()
 这个函数,我们可以认为就是
postRun()
类似的方法,而
onNext()
中传入的
o
是我们之前生成的
Observable$1
,是从
Observable.just
封装出来的
Observable<Observable<T>>
中产生的,这里调用了
Observable$1.unsafeSubscribe
方法,我们暂时不关心它和
subscribe
有什么不同,但是我们知道最终功能是一样的就好了。

注意它运行时的线程!!在
inner
这个
Worker
上!于是它的运行线程已经被改了!!

好,这里的
unsafeSubscribe
调用的方法就是调用原先
Observable$1.onSubscribe
中的
call
方法:

这个
Observable$1
就是我们之前自己定义的
Observable
了。

综上所述,如果我们需要我们的
Observable$1
在一个别的线程上运行的时候,只需要在后面跟一个
subscribeOn
即可。结合扔物线大大的图如下:




总结

这里逻辑着实不好理解。如果还没有理解的朋友,可以按照我前文说的顺序,细致的看下来,我把逻辑过一遍之后,发现
lift
的陷阱实在太大,内部类用的风生水起,一不小心,就不知道一个变量的上下文是什么,需要特别小心。

之前我们分析过
subscribeOn
这个函数,

现在我们来看下
subscribeOn
observeOn
这两个函数到底有什么异同。

用过
rxjava
的旁友都知道,
subscribeOn
observeOn
都是用来切换线程用的,可是我什么时候用
subscribeOn
,什么时候用
observeOn
呢,我们很少知道这两个区别是啥。

友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。


subscribeOn

先看下
OperatorSubscribeOn
的核心代码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {

@Override
public void call() {

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

....
};

source.unsafeSubscribe(s);
}
});
}
}


这里注意两点:

因为
OperatorSubscribeOn
是个
OnSubscribe
对象,所以在
call
参数中传入的
subscriber
就是我们在外面使用
Observable.subscribe(a)
传入的对象
a


这里
source
对象指向的是调用
subscribeOn
之前的那个
Observable
序列。

明确了这两点,我们就很好的知道了
subscribeOn
是如何工作,产生神奇的效果了。

其实最最主要的就是一行函数

source.unsafeSubscribe(s);

并且要注意它所在的位置,是在worker的
call
里面,说白了,就是把
source.subscribe
这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn
的调用,改变了调用前序列所运行的线程。


observeOn

同样看下
OperatorObserveOn
这个类的主要代码:
public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final boolean delayError;

/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this.scheduler = scheduler;
this.delayError = delayError;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
....
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
parent.init();
return parent;
}

/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;

// the status of the current stream
volatile boolean finished;

final AtomicLong requested = new AtomicLong();

final AtomicLong counter = new AtomicLong();

/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
}
}

void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;

localChild.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}

});
localChild.add(recursiveScheduler);
localChild.add(this);
}

@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}

@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}

@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}

@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
error = e;
finished = true;
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

// only execute this from schedule()
@Override
public void call() {
long emitted = 0L;

long missed = 1L;

// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each RxRingBuffer.SIZE elements)

for (;;) {
long requestAmount = requested.get();
long currentEmission = 0L;

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(localOn.getValue(v));

currentEmission++;
emitted++;
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

if (currentEmission != 0L) {
BackpressureUtils.produced(requested, currentEmission);
}

missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}

if (emitted != 0L) {
request(emitted);
}
}

boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}

if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}

}

return false;
}
}
}


这里的代码有点长,我们先注意到它是一个
Operator
,它没有对上层
Observable
做任何的控制或者包装。

既然是
Operator
,那么它的职责就是把一个
Subscriber
转换成另外一个
Subscriber

我们来关注下转换后的
Subscriber
对转换前的
Subscriber
做了些什么事。

首先它是一个
ObserveOnSubscriber
类, 既然是
Subscriber
那么肯定有
onNext
onComplete
 和
onError
 看最主要的onNext
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}


好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用
schedule
启动传入的
worker


我们这里需要注意下:

在调用
observeOn
前的序列,把结果传入到
onNext
就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给
ObserveOnSubscriber
继续。

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}


recursiveScheduler
 就是之前我们传入的Scheduler,我们一般会在
observeOn
传入
AndroidScheluders.mainThread()
对吧、

接下去,我们看下在
scheduler
中调用的
call
方法,这里只列出主要带代码
@Override
public void call() {
...
final Subscriber<? super T> localChild = this.child;
for (;;) {
...
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(localOn.getValue(v));

...
}

if (emitted != 0L) {
request(emitted);
}
}


OK,在
Scheduler
启动后, 我们在
Observable.subscribe(a)
传入的
a
就是这里的
child
, 我们看到,在
call
中终于调用了它的
onNext
方法,把真正的结果传了出去,但是在这里,我们是工作在
observeOn
的线程上的。

那么总结起来的结论就是:

observeOn
 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上

observeOn
 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的
subscriber



复杂情况

我们经常多次使用
subscribeOn
切换线程,那么以后是否可以组合
observeOn
subscribeOn
达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道
observeOn
调用之后,再调用
subscribeOn
是无效的,原因是什么?

因为
subscribeOn
改变的是
subscribe
这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,
observeOn
的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算
subscribeOn
调用了,也只是改变
observeOn
这个消费者所在的线程,和
OperatorObserveOn
中存储的原始消费者一点关系都没有,它还是由
observeOn
控制。


总结

如果我们有一段这样的序列
Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特别注意
.subscribe(handleData)


假设这里我们是在主线程上调用这段代码,

那么

操作1
操作2
是在io线程上,因为之后
subscribeOn
切换了线程

操作3
操作4
也是在io线程上,因为在
subscribeOn
切换了线程之后,并没有发生改变。

操作5
操作6
是在main线程上,因为在他们之前的
observeOn
切换了线程。

特别注意那一段,对于
操作5
操作6
是无效的

再简单点总结就是

subscribeOn
的调用切换之前的线程。

observeOn
的调用切换之后的线程。

observeOn
之后,不可再调用
subscribeOn
 切换线程

=========

续 特别感谢@扔物线给的额外的总结

下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件

只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)

这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()

observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作

不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: