RxJava线程切换源码分析
2017-11-03 23:43
459 查看
RxJava特点
异步RxJava内四个基本概念
- Observable 可观察者,即被观察者 - Observer 观察者 - subscribe 订阅 - 事件
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer。
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。
Observer和Subscriber不同点:
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
RxJava线程切换原则
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread(),指定操作在 Android 主线程运行。
RxJava1.2.1线程切换源码分析
示例代码.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
若想了解RxJava整个事件流程请参考 RxJava源码深度解析 文章
Schedulers.io()
public static Scheduler io() { return RxJavaHooks.onIOScheduler(getInstance().ioScheduler); }
getInstance()
private static Schedulers getInstance() { for (;;) { Schedulers current = INSTANCE.get(); if (current != null) { return current; } current = new Schedulers(); if (INSTANCE.compareAndSet(null, current)) { return current; } else { current.shutdownInstance(); } } }
通过for循环获取Schedulers实例,使用AtomicReference是为了保证多线程安全操作
getInstance().ioScheduler
private Schedulers() { RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); //通过Schedulers.computation()创建的 Scheduler c = hook.getComputationScheduler(); if (c != null) { computationScheduler = c; } else { computationScheduler = RxJavaSchedulersHook.createComputationScheduler(); } //通过Schedulers.io()创建的 Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } //通过Schedulers.newThread()创建的 Scheduler nt = hook.getNewThreadScheduler(); if (nt != null) { newThreadScheduler = nt; } else { newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler(); } }
RxJavaSchedulersHook.createIoScheduler()
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
创建一个RxThreadFactory对象,继承ThreadFactory接口
createIoScheduler方法
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory == null"); } return new CachedThreadScheduler(threadFactory); }
CachedThreadScheduler类
public CachedThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
public void start() { CachedWorkerPool update = new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
创建一个CachedThreadScheduler对象并在其内部创建一个缓存线程池对象,并定义线程的保活时间为60s
class CachedWorkerPool { //线程工厂 private final ThreadFactory threadFactory; //保活时间60s private final long keepAliveTime; //非阻塞链式节点无界线程安全队列 private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue; private final CompositeSubscription allWorkers; //创建一个核心线程数为1的执行任务线程池 private final ScheduledExecutorService evictorService; private final Future<?> evictorTask; CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) { this.threadFactory = threadFactory; this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>(); this.allWorkers = new CompositeSubscription(); ScheduledExecutorService evictor = null; Future<?> task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = threadFactory.newThread(r); thread.setName(thread.getName() + " (Evictor)"); return thread; } }); NewThreadWorker.tryEnableCancelPolicy(evictor); //延迟60纳秒后每60纳秒执行一次。 task = evictor.scheduleWithFixedDelay( new Runnable() { @Override public void run() { //删除所有任务 evictExpiredWorkers(); } }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS ); } evictorService = evictor; evictorTask = task; } //先从线程队列中获取ThreadWorker对象,如果没有则新建 ThreadWorker get() { if (allWorkers.isUnsubscribed()) { return SHUTDOWN_THREADWORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } //将使用完的线程放到线程池中,保活时间为60s void release(ThreadWorker threadWorker) { threadWorker.setExpirationTime(now() + keepAliveTime); expiringWorkerQueue.offer(threadWorker); } //删除所有任务 void evictExpiredWorkers() { ....省略.... } long now() { return System.nanoTime(); } //关闭线程 void shutdown() { ....省略.... }
ThreadWorker类
class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { //主要看这里 super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
在super(threadFactory)里又创建了一个线程池newScheduledThreadPool
public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } //赋值给executor executor = exec; }
总结
Schedulers.io()方法就是在内部创建了一个CachedThreadScheduler对象,在CachedThreadScheduler内部又创建了缓存任务的线程池CachedWorkerPool对象, 在CachedWorkerPool内部创建了ThreadWorker线程任务对象,在ThreadWorker的构造函数中调用了父类NewThreadWorker的构造, 在NewThreadWorker内部中创建了ScheduledThreadPool线程池对象用于执行任务。以此去让任务运行到非主线程里,而在CachedWorkerPool内部也创建了一个ScheduledThreadPool线程池用于清除所有任务。
Schedulers.newThread()
public static Scheduler newThread() { return RxJavaHooks.onNewThreadScheduler(getInstance().newThreadScheduler); }
getInstance().newThreadScheduler就是RxJavaSchedulersHook.createNewThreadScheduler()
public static Scheduler createNewThreadScheduler() { return createNewThreadScheduler(new RxThreadFactory("RxNewThreadScheduler-")); }
public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory == null"); } return new NewThreadScheduler(threadFactory); }
NewThreadScheduler类
class NewThreadScheduler extends Scheduler { private final ThreadFactory threadFactory; public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); } }
在NewThreadScheduler对外提供createWorker()方法用于返回NewThreadWorker对象
public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } executor = exec; }
总结
Schedulers.newThread()方法就是在内部创建一个NewThreadScheduler对象,在NewThreadScheduler对象内部对外提供createWorker()方法以便外界调用, 并返回NewThreadWorker对象,在NewThreadWorker对象内部创建了ScheduledThreadPool执行定时任务的线程池,用于在非主线程中执行任务。
AndroidSchedulers.mainThread()
public static Scheduler mainThread() { return getInstance().mainThreadScheduler; }
getInstance()
private static AndroidSchedulers getInstance() { for (;;) { AndroidSchedulers current = INSTANCE.get(); if (current != null) { return current; } current = new AndroidSchedulers(); if (INSTANCE.compareAndSet(null, current)) { return current; } } }
通过for循环获取AndroidSchedulers实例,使用AtomicReference是为了保证多线程安全操作
AndroidSchedulers()类
private AndroidSchedulers() { RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook(); Scheduler main = hook.getMainThreadScheduler(); if (main != null) { mainThreadScheduler = main; } else { //获取LooperScheduler对象 mainThreadScheduler = new LooperScheduler(Looper.getMainLooper()); } }
Looper.getMainLooper()是获取主线程的Looper对象
LooperScheduler对象
LooperScheduler(Looper looper) { handler = new Handler(looper); }
LooperScheduler内部创建了一个Handler对象,并给Handler对象绑定主线程的Looper对象
LooperScheduler的createWorker()方法
public Worker createWorker() { return new HandlerWorker(handler); }
HandlerWorker对象
class HandlerWorker extends Worker { private final Handler handler; private final RxAndroidSchedulersHook hook; private volatile boolean unsubscribed; HandlerWorker(Handler handler) { this.handler = handler; this.hook = RxAndroidPlugins.getInstance().getSchedulersHook(); } @Override public void unsubscribe() { unsubscribed = true; b6b7 //移除handler的任务 handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isUnsubscribed() { return unsubscribed; } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (unsubscribed) { return Subscriptions.unsubscribed(); } //没什么用 action = hook.onSchedule(action); //创建了ScheduledAction的任务 ScheduledAction scheduledAction = new ScheduledAction(action, handler); //把任务包装到Message里 Message message = Message.obtain(handler, scheduledAction); message.obj = this; // Used as token for unsubscription operation. //发送一个0毫秒的延迟任务,发送到主线程 handler.sendMessageDelayed(message, unit.toMillis(delayTime)); if (unsubscribed) { //移除所有任务 handler.removeCallbacks(scheduledAction); return Subscriptions.unsubscribed(); } return scheduledAction; } @Override public Subscription schedule(final Action0 action) { //调用同级schedule方法 return schedule(action, 0, TimeUnit.MILLISECONDS); } }
class ScheduledAction implements Runnable, Subscription { private final Action0 action; private final Handler handler; private volatile boolean unsubscribed; ScheduledAction(Action0 action, Handler handler) { this.action = action; this.handler = handler; } @Override public void run() { try { //调用传进来的Action的call方法 action.call(); } catch (Throwable e) { .....省略..... } } @Override public void unsubscribe() { unsubscribed = true; handler.removeCallbacks(this); } @Override public boolean isUnsubscribed() { return unsubscribed; } }
ScheduledAction继承Runnable接口,通过Handler去执行ScheduledAction任务,ScheduledAction任务执行run方法后去回调传进来的Action的call方法
总结
AndroidSchedulers.mainThread()方法就是在内部创建一个LooperScheduler对象,在LooperScheduler对象内部创建一个和主线程绑定的Handler对象。
通过外界调用LooperScheduler对象内的的createWorker方法来创建HandlerWorker对象并返回给外界创建的HandlerWorker对象,外界通过调用HandlerWorker对象的内部schedule(Action0 action)方法,把需要执行的任务传递过来, 再调用同级的schedule(Action0 action, long delayTime, TimeUnit unit)方法,把Action0对象封装到ScheduledAction任务中并通过Handler把任务发送到主线程中去执行,来进行主线程的切换。
相关文章推荐
- 从源码分析RxJava在Android里线程切换的实现
- handler ,Looper的机制,分析源码(一)线程切换。
- RxJava2线程切换源码_observeOn
- RxJava2.0- Schedulers线程切换原理分析
- 友好 RxJava2.x 源码解析(二)线程切换
- RxJava2线程切换源码_subscribeOn
- Rxjava源码分析
- 源码分析异步消息处理线程机制(Looper MessageQueue Handler Message)
- RxJava subscribeOn 与 onserveOn 线程切换记录
- RxJava源码分析(二)
- tomcat的NIO线程模型源码分析
- spark源码分析之Master源码主备切换机制分析
- Android 切换系统语言源码分析
- 4、Volley解析(二),源码的深入分析一,缓存线程和网络请求线程
- 转载 linux 2.6线程创建源码分析
- 卷二 Dalvik与Android源码分析 第五章 Interpreter与JIT 5.5 Interpreter的切换 图书版试读--请勿转发
- rxJava 源码 原理分析
- 线程资源PHP源码分析之线程安全模型
- RxJava基本流程和lift源码分析
- RxJava 源码解读分析 observeOn