您的位置:首页 > 编程语言 > Java开发

RxJava2线程切换源码_observeOn

2017-06-13 11:42 393 查看

一、执行流程图



在上一节RxJava2线程切换源码_subscribeOn的示例代码中,我们是在 ObservableOnSubscribe#subscribe 中去执行 getBitampFormServer 方法去加载一个 Bitmap 对象,并且也分析了发射器在子线程中发射事件的原理。下面分析的是当成功获取到这个 bitmap 之后如何让 observer 在主线程去接收然后设置给 mImageView 对象。

二、observeOn(AndroidScheduler.mainThread())

mainThread()

根据 mainThread() 源码的调用关系来看,最终返回的是 HandlerScheduler 对象,HandlerScheduler 是一个 Scheduler 的子类,其内部封装了一个可以在主线程发送消息的 handler 对象。看到这里就大概明白了,将 observer 切换到主线程去接收事件,内部就是通过一个可以在主线程发送消息的 Handler 去实现的。

public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});

private static final class MainHolder {
//HandlerScheduler 内装了一个可以在主线程发送消息的 handler 对象
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

//HandlerScheduler
final class HandlerScheduler extends Scheduler {}


observerOn()

在 observeOn 内部源码的调用关系可以看到,最终是返回一个 ObservableObserveOn 对象,它是 Observable 的子类对象。从上一节的源码分析中,我们知道每次新创建的 Observable 对象都是需要去订阅对应的 observer 之后才能发送事件的。因此在发生订阅关系时,会回调 subscribeActual(observer) 方法。下面我们分析 ObservableObserveOn#subscribeActual 的内部实现。

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//返回一个 ObservableObserveOn 对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}


ObservableObserveOn#subscribeActual(observer)

该方法内部通过 HandlerScheduler 创建一个 worker 用于去执行一个任务,因为内部维护了具备 MainLooper 的 Hadnler, 因此它具备在主线程执行任务的功能。

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//这里的 scheduler 就是 HandlerScheduler 对象
Scheduler.Worker w = scheduler.createWorker();
//source 就是上一级 subscribeOn 中创建的 ObservableSubscribeOn 对象
//内部创建一个 ObserveOnObserver 包装 传入的 observer 对象。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}


ObserveOnObserver 内部将事件切换到主线程运行呢?

onNext

@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//核心代码
schedule();
}


schedule()

该方法是负责去执行将上一级发送过来的任务交给下一级 observer 去处理。因为 ObserveOnObserver 是实现了 Runnable 接口,因此 this 就是表示 ObserveOnObserver 对象。所以任务被执行的话,那么当前 ObserveOnObserver 的 run 方法就会被执行。

void schedule() {
if (getAndIncrement() == 0) {
//通过 worker 去执行这个任务
worker.schedule(this);
}
}


worker.schedule

内部通过 handler 发送 Message ,注意该 Message 的 Callback 是被赋值的了,对应的值就是 ScheduledRunnable 对象。

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//这里 scheduled 是做为第二个参数,内部会给 Message 的 callback 赋值,这个会在接受消息那里使用。
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//切换线程核心代码:通过 handler 将其切换到主线程执行
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}


接受发送的事件

我们知道通过 Handler#send 的方式发送的消息最终都会在回调 Handler 的 dispatchMessage(Message) 方法进行分发操作。在上面 Message.obtain() 方法已经为 msg.callback 赋值了,因此在这里会调用 handleCallback 方法。

public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}


handleCallback

这里可以知道,原始消息的 callback 的 run 方法会被执行。该消息是在 HandlerScheduler#HandlerWorker.schedule 中调用。也即是 ScheduledRunnable 会被调用,而 ScheduledRunnable 内部包装了 ObserveOnObserver 这个 Runnble 对象,因此 ObserveOnObserver 内部的 run 方法会被执行。

private static void handleCallback(Message message) {
message.callback.run();
}


ObserveOnObserver#run()

@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}


drainNormal()

在这里 actual.onNext(v) 往下传递事件。至此,事件通过 observeOn 方法就可以让 observer 在主线程中去接收事件。

void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
//这个 actual 就是下一级的 Observer 对象。
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//内部就是通过 a 再往传递的。
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息