RxJava2线程切换源码_observeOn
2017-06-13 11:42
393 查看
一、执行流程图
在上一节RxJava2线程切换源码_subscribeOn的示例代码中,我们是在 ObservableOnSubscribe#subscribe 中去执行 getBitampFormServer 方法去加载一个 Bitmap 对象,并且也分析了发射器在子线程中发射事件的原理。下面分析的是当成功获取到这个 bitmap 之后如何让 observer 在主线程去接收然后设置给 mImageView 对象。
二、observeOn(AndroidScheduler.mainThread())
mainThread()根据 mainThread() 源码的调用关系来看,最终返回的是 HandlerScheduler 对象,HandlerScheduler 是一个 Scheduler 的子类,其内部封装了一个可以在主线程发送消息的 handler 对象。看到这里就大概明白了,将 observer 切换到主线程去接收事件,内部就是通过一个可以在主线程发送消息的 Handler 去实现的。
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { //HandlerScheduler 内装了一个可以在主线程发送消息的 handler 对象 static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } //HandlerScheduler final class HandlerScheduler extends Scheduler {}
observerOn()
在 observeOn 内部源码的调用关系可以看到,最终是返回一个 ObservableObserveOn 对象,它是 Observable 的子类对象。从上一节的源码分析中,我们知道每次新创建的 Observable 对象都是需要去订阅对应的 observer 之后才能发送事件的。因此在发生订阅关系时,会回调 subscribeActual(observer) 方法。下面我们分析 ObservableObserveOn#subscribeActual 的内部实现。
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); //返回一个 ObservableObserveOn 对象 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
ObservableObserveOn#subscribeActual(observer)
该方法内部通过 HandlerScheduler 创建一个 worker 用于去执行一个任务,因为内部维护了具备 MainLooper 的 Hadnler, 因此它具备在主线程执行任务的功能。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //这里的 scheduler 就是 HandlerScheduler 对象 Scheduler.Worker w = scheduler.createWorker(); //source 就是上一级 subscribeOn 中创建的 ObservableSubscribeOn 对象 //内部创建一个 ObserveOnObserver 包装 传入的 observer 对象。 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
ObserveOnObserver 内部将事件切换到主线程运行呢?
onNext@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } //核心代码 schedule(); }
schedule()
该方法是负责去执行将上一级发送过来的任务交给下一级 observer 去处理。因为 ObserveOnObserver 是实现了 Runnable 接口,因此 this 就是表示 ObserveOnObserver 对象。所以任务被执行的话,那么当前 ObserveOnObserver 的 run 方法就会被执行。
void schedule() { if (getAndIncrement() == 0) { //通过 worker 去执行这个任务 worker.schedule(this); } }
worker.schedule
内部通过 handler 发送 Message ,注意该 Message 的 Callback 是被赋值的了,对应的值就是 ScheduledRunnable 对象。
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //这里 scheduled 是做为第二个参数,内部会给 Message 的 callback 赋值,这个会在接受消息那里使用。 Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. //切换线程核心代码:通过 handler 将其切换到主线程执行 handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }
接受发送的事件
我们知道通过 Handler#send 的方式发送的消息最终都会在回调 Handler 的 dispatchMessage(Message) 方法进行分发操作。在上面 Message.obtain() 方法已经为 msg.callback 赋值了,因此在这里会调用 handleCallback 方法。
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } }
handleCallback
这里可以知道,原始消息的 callback 的 run 方法会被执行。该消息是在 HandlerScheduler#HandlerWorker.schedule 中调用。也即是 ScheduledRunnable 会被调用,而 ScheduledRunnable 内部包装了 ObserveOnObserver 这个 Runnble 对象,因此 ObserveOnObserver 内部的 run 方法会被执行。
private static void handleCallback(Message message) { message.callback.run(); }
ObserveOnObserver#run()
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
drainNormal()
在这里 actual.onNext(v) 往下传递事件。至此,事件通过 observeOn 方法就可以让 observer 在主线程中去接收事件。
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; //这个 actual 就是下一级的 Observer 对象。 final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } //内部就是通过 a 再往传递的。 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
相关文章推荐
- RxJava线程切换源码分析
- RxJava2线程切换源码_subscribeOn
- 从源码分析RxJava在Android里线程切换的实现
- RxJava线程切换之subscribeOn()和observeOn()的总结
- 友好 RxJava2.x 源码解析(二)线程切换
- 浅析RxJava和RxAndroid关于线程切换和操作符作用
- Rxjava源码(三)-----线程控制Scheduler
- RxJava系列7:线程切换 Scheduler
- rxjava源码中的线程知识
- RxJava 是如何实现线程切换的(上)
- RxJava中的线程调度源码解析
- RxJava实例-线程切换
- RxJava 源码解读分析 observeOn
- RxJava2.0- Schedulers线程切换原理分析
- rxjava切换线程避免重复代码
- handler ,Looper的机制,分析源码(一)线程切换。
- RxJava回调线程切换
- RxJava线程切换代替Thread和Handler
- RxJava 操作符 on*和doOn* 线程切换 调度 Schedulers
- RxJava的线程切换