RxJava2线程切换源码_subscribeOn
2017-06-12 15:50
393 查看
在上一小节中,有这么一个结论,那就是每一次调用 Observable 的操作符都会返回一个新的 Observable 对象,并且会通过构造的方式传入上一级创建的 Observable 对象,将其保存起来,下面是示例代码。那么接下来操作的 subscribeOn、observeOn 操作符都会分别创建新的 Observable 对象,并存储上一级创建的 observable。
这个方法内部会通过创建一个 ObservableSubscribeOn 对象,根据之前的经验可知道,这个类肯定也是一个 Observable 的子类对象。因此对于 subscribe(observer) 方法而言,我们就只关心它真正调用的方法 subscribeActual(observer) 。
subscribeActual(observer)
在subscribeActual 内部首先是对 observer 进行包装成 SubscribeOnObserver 对象。这里的 SubscribeOnObserver 不仅是一个 Observer ,而且具备一个连接器的作用 Disposable 。
SubscribeOnObserver
这个类是对 observer 的包装,内部实现了 Observer 和 Disposable 接口。也就是说它既有订阅者的功能,也实现了连接器的功能。注意 actual 这个变量,它是下一级的 Observer 对象,为什么说是下一级呢?因为每次包装的 Observer 是一级级别往上被订阅的,当前的 Observer 都会包装下一级别的 Observer 对象。例如 SubscribeOnObserver 就封装了下一级的 Observer 对象,其实就是当前 Observer 接受到事件源发送过来的事件时,再调用包装的 Observer 回调给下一级,这样一级级传递下去知道最后一级 Observer。
SubscribeTask(parent)
SubscribeTask 它是一个 Runnbale ,因此我们把它理解为一个任务。首先关注是它的 run 方法,它内部实现很简单,就是通知上一级的 Observable 通过 subscribe 这个方法进行订阅当前 observer 。下面会执行一大堆代码,其实都会为创建一个线程然后交给指定的线程池取执行这个任务,先记住这个任务的使命。那么既然是一个线程,那么肯定有一个地方需要执行这个线程的,接下来关注 scheduler.scheduleDirect 方法。
开始寻找 SubscribeTask 这个线程实在哪里被执行的。
scheduler.scheduleDirect(new SubscribeTask(parent))
刚才分析过 scheduler 就是 IoScheduler 对象了,跟踪源码发现,这个类并没有重写这个方法,因此直接进入 Scheduler 查看。
IoScheduler#createWorker();
现在我们知道我们的任务是交给 worker.schedule() 去执行的。因为 Worker 是负责去执行调度的,因此不同的子类会有不同的 Worker 的实现,在 Scheduler 中通过 createWorker() 来获取子类实现的 Worker 对象。
Scheduler#Worker
这个类具备延迟执行任务,周期性执行任务的功能。所有的执行都是基于 schedule() 方法,而这个方法是一个抽象方法,也就是它无法知道子类需要怎么执行这个任务,因为每一种调度器执行的方式 schedule 都不一样,因此交给子类去实现。
EnentLooerWorker#schedule()
有了 Worker 之后就要开始执行【我们的任务 action 啦】
threadWorker.scheduleActual
threadWorker 是 ThreadWorker ,继承至 NewThreadWorker 。
//上一级创建的 observable 对象:ObservableOnSubscribe Observable.create(new ObservableOnSubscribe<String>() {...} public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; //保存上一级创建的 Observable 对象 : ObservableOnSubscribe public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } }
一、执行流程图
二、示例代码
下面这段代码的功能就是在 subscribe 方法内部通过调用 getBitampFormServer 去请求一个 Bitmap 对象,这个方法是耗时操作,当前的操作应该在子线程中执行,得到 bmp 之后,根据结果分别去调用 onNext() /onError() 方法。而在订阅者中若是 onNext 被回调则表示成功获取到 bmp,对应地将其设置给对应的 mImageView 对象上,如果 onError 被回调了,那么表示加载 Bitmap 是失败的,对应的再做一些其它操作,这些操作应该在主线程中进行。本次通过从源码的角度探究的是 RxJava2 内部是如何进行线程切换操作的。本小节先分析 subscribeOn 如何去实现事件源在子线程中发射事件。也就是 ObservableOnSubscribe#subscribe 在子线程中去执行。Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception { //该方法进行网络请求,是比较耗时的操作。 Bitmap bmp = getBitampFormServer("uri"); if(bmp!=null) { //获取 bmp 成功 e.onNext(bmp); e.onComplete(); }else{ //如果从网络加载图片不成功,回调onError 来通知订阅者 e.onError(new Exception("图片加载出错啦")); } }}) //事件源发射事件在子线程中运行 .subscribeOn(Schedulers.io()) //订阅者在主线程中接受事件 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe(Disposable d) { Log.e("zeal", "onSubscribe"); } @Override public void onNext(@NonNull Bitmap bmp) { //设置显示在 ImageView 上 mImageView.setImageBitmap(bmp); } @Override public void onError(@NonNull Throwable e) { Log.e("zeal","error:"+e.toString()); } @Override public void onComplete() { Log.e("zeal", "onComplete"); } });
2、.subscribeOn(Schedulers.io()) 源码分析
public final Observable<T> subscribeOn(Scheduler scheduler) { ... return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
2.1、Scheduler
从下面的类注释可以知道,这个类是一个调度类,可以延时/周期性地去执行一个任务。可以从 Schedulers 这个类去获取 Scheduler 的实现子类对象,例如在频繁进行 io 操作就可以调用 Schedulers.io() ,如果是计算比较多的可以调用 Schedulers.computation()。/** * A {@code Scheduler} is an object that specifies an API for scheduling * units of work with or without delays or periodically. * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}. */ public abstract class Scheduler {}
2.2、Schedulers.io()
通过下面的 Schedulers.io() 源码跟踪,最终返回的是一个 IoScheduler 对象,这个对象实际上就是 Scheduler 的子类对象。那么就符合 subscribeOn(Scheduler) 参数的要求了。@NonNull public static Scheduler io() { //内部是 IO return RxJavaPlugins.onIoScheduler(IO); } //----------------------------------------------------- @NonNull static final Scheduler IO; static { ... // IO 是在静态代码块中实例化的 IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { //这里返回一个 IoHolder 对象。 return IoHolder.DEFAULT; } }); ... } //----------------------------------------------------- static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); } //----------------------------------------------------- //IoHolder 类定义中可以知道,该类是继承至 Scheduler public final class IoScheduler extends Scheduler {}
2.3、subscribeOn 内部实现
subscribeOn(Scheduler scheduler)这个方法内部会通过创建一个 ObservableSubscribeOn 对象,根据之前的经验可知道,这个类肯定也是一个 Observable 的子类对象。因此对于 subscribe(observer) 方法而言,我们就只关心它真正调用的方法 subscribeActual(observer) 。
subscribeActual(observer)
在subscribeActual 内部首先是对 observer 进行包装成 SubscribeOnObserver 对象。这里的 SubscribeOnObserver 不仅是一个 Observer ,而且具备一个连接器的作用 Disposable 。
@Override public void subscribeActual(final Observer<? super T> s) { //包装 observer 对象 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //将连接器 parent 通过 onSubscribe 回调给 observer 对象 s.onSubscribe(parent); //这里是通过 scheduler 去执行一个任务 SubscribeTask。 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
SubscribeOnObserver
这个类是对 observer 的包装,内部实现了 Observer 和 Disposable 接口。也就是说它既有订阅者的功能,也实现了连接器的功能。注意 actual 这个变量,它是下一级的 Observer 对象,为什么说是下一级呢?因为每次包装的 Observer 是一级级别往上被订阅的,当前的 Observer 都会包装下一级别的 Observer 对象。例如 SubscribeOnObserver 就封装了下一级的 Observer 对象,其实就是当前 Observer 接受到事件源发送过来的事件时,再调用包装的 Observer 回调给下一级,这样一级级传递下去知道最后一级 Observer。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { ... final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } //发送事件 @Override public void onNext(T t) { //回调给下一级 actual.onNext(t); } //发送事件 @Override public void onError(Throwable t) { //回调给下一级 actual.onError(t); } //发送事件 @Override public void onComplete() { //回调给下一级 actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d);
SubscribeTask(parent)
SubscribeTask 它是一个 Runnbale ,因此我们把它理解为一个任务。首先关注是它的 run 方法,它内部实现很简单,就是通知上一级的 Observable 通过 subscribe 这个方法进行订阅当前 observer 。下面会执行一大堆代码,其实都会为创建一个线程然后交给指定的线程池取执行这个任务,先记住这个任务的使命。那么既然是一个线程,那么肯定有一个地方需要执行这个线程的,接下来关注 scheduler.scheduleDirect 方法。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //【核心代码,这段代码决定上一级observable订阅在哪个线程执行。】 //source:就是上一级创建的 observable //parent 就是包装后的 observer source.subscribe(parent); } }
开始寻找 SubscribeTask 这个线程实在哪里被执行的。
scheduler.scheduleDirect(new SubscribeTask(parent))
刚才分析过 scheduler 就是 IoScheduler 对象了,跟踪源码发现,这个类并没有重写这个方法,因此直接进入 Scheduler 查看。
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } //这里的 delay = 0,也就是马上执行这个任务。 //【这个 run 就是我们的目标】 @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //核心代码 createWorker() 创建一个可以可以执行 run 的 worker final Worker w = createWorker(); //对 run 进行了包装,实际上还是 run 这个对象。【这个 run 就是我们的目标】 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //decoratedRun 交给了 worker 去执行 w.schedule(new Runnable() { @Override public void run() { try { 【我们的目标在此处被执行】 decoratedRun.run(); } finally { //事件源发射事件完毕之后,就关闭连接器。 w.dispose(); } } }, delay, unit); return w; }
IoScheduler#createWorker();
现在我们知道我们的任务是交给 worker.schedule() 去执行的。因为 Worker 是负责去执行调度的,因此不同的子类会有不同的 Worker 的实现,在 Scheduler 中通过 createWorker() 来获取子类实现的 Worker 对象。
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
Scheduler#Worker
这个类具备延迟执行任务,周期性执行任务的功能。所有的执行都是基于 schedule() 方法,而这个方法是一个抽象方法,也就是它无法知道子类需要怎么执行这个任务,因为每一种调度器执行的方式 schedule 都不一样,因此交给子类去实现。
@NonNull public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
EnentLooerWorker#schedule()
有了 Worker 之后就要开始执行【我们的任务 action 啦】
@Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } //【任务 action 】交给 threadWorker 去执行 return threadWorker.scheduleActual(action, delayTime, unit, tasks);
threadWorker.scheduleActual
threadWorker 是 ThreadWorker ,继承至 NewThreadWorker 。
static final class ThreadWorker extends NewThreadWorker //NewThreadWorker 内部维护一个线程池 executor。 public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } //最终代码会走到这里 @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //对 run 进行包装 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { //上面已经提到,delayTime = 0;所以这个任务会被立即执行, if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } @Override public void run() { try { try { //执行原始的 run 方法。 actual.run(); } catch (Throwable e) { // Exceptions.throwIfFatal(e); nowhere to go RxJavaPlugins.onError(e); } } finally { Object o = get(PARENT_INDEX); if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { ((DisposableContainer)o).delete(this); } for (;;) { o = get(FUTURE_INDEX); if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { break; } } } }
2.4、 结果
f = executor.submit((Callable)sr); 这里执行了 SubscribeTask#run() 方法,也就是当前的订阅者 Observer 订阅了上一级的 Observable 。也就是上一级的 ObservableCreate.subscribe(observer) 被执行了。请注意它是在子线程中被执行的。如果想要了解接下来的事件源是怎么发送事件的可以参考RxJava2_整体流程分析相关文章推荐
- 友好 RxJava2.x 源码解析(二)线程切换
- 从源码分析RxJava在Android里线程切换的实现
- RxJava2线程切换源码_observeOn
- RxJava线程切换源码分析
- 浅析RxJava和RxAndroid关于线程切换和操作符作用
- Rxjava源码(三)-----线程控制Scheduler
- RxJava的线程切换
- rxjava源码中的线程知识
- RxJava2.0- Schedulers线程切换原理分析
- RxJava 是如何实现线程切换的(上)
- RxJava实例-线程切换
- RxJava系列7:线程切换 Scheduler
- handler ,Looper的机制,分析源码(一)线程切换。
- RxJava 源码解读分析 subscribeOn 方法
- RxJava线程切换代替Thread和Handler
- RxJava中的线程调度源码解析
- RxJava 操作符 on*和doOn* 线程切换 调度 Schedulers
- RxJava回调线程切换
- rxjava切换线程避免重复代码
- Rxjava原理探索:切换线程,变换