您的位置:首页 > 编程语言 > Java开发

RxJava线程切换源码分析

2017-11-03 23:43 459 查看

RxJava特点

异步

RxJava内四个基本概念

- Observable 可观察者,即被观察者
- Observer 观察者
- subscribe 订阅
- 事件


Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer。

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。

Observer和Subscriber不同点:

onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

RxJava线程切换原则

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

AndroidSchedulers.mainThread(),指定操作在 Android 主线程运行。

RxJava1.2.1线程切换源码分析

示例代码

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())


若想了解RxJava整个事件流程请参考 RxJava源码深度解析 文章

Schedulers.io()

public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}


getInstance()

private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
current.shutdownInstance();
}
}
}


通过for循环获取Schedulers实例,使用AtomicReference是为了保证多线程安全操作

getInstance().ioScheduler

private Schedulers() {
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
//通过Schedulers.computation()创建的
Scheduler c = hook.getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
}
//通过Schedulers.io()创建的
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {

ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
//通过Schedulers.newThread()创建的
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}


RxJavaSchedulersHook.createIoScheduler()

public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}


创建一个RxThreadFactory对象,继承ThreadFactory接口

createIoScheduler方法

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}


CachedThreadScheduler类

public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}


public void start() {
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}


创建一个CachedThreadScheduler对象并在其内部创建一个缓存线程池对象,并定义线程的保活时间为60s

class CachedWorkerPool {
//线程工厂
private final ThreadFactory threadFactory;
//保活时间60s
private final long keepAliveTime;
//非阻塞链式节点无界线程安全队列
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;
//创建一个核心线程数为1的执行任务线程池
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;

CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
this.threadFactory = threadFactory;
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
NewThreadWorker.tryEnableCancelPolicy(evictor);
//延迟60纳秒后每60纳秒执行一次。
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
//删除所有任务
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
evictorService = evictor;
evictorTask = task;
}

//先从线程队列中获取ThreadWorker对象,如果没有则新建
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}

//将使用完的线程放到线程池中,保活时间为60s
void release(ThreadWorker threadWorker) {
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}

//删除所有任务
void evictExpiredWorkers() {
....省略....
}

long now() {
return System.nanoTime();
}

//关闭线程
void shutdown() {
....省略....
}


ThreadWorker类

class ThreadWorker extends NewThreadWorker {
private long expirationTime;

ThreadWorker(ThreadFactory threadFactory) {
//主要看这里
super(threadFactory);
this.expirationTime = 0L;
}

public long getExpirationTime() {
return expirationTime;
}

public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}


在super(threadFactory)里又创建了一个线程池newScheduledThreadPool

public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
//赋值给executor
executor = exec;
}


总结

Schedulers.io()方法就是在内部创建了一个CachedThreadScheduler对象,在CachedThreadScheduler内部又创建了缓存任务的线程池CachedWorkerPool对象, 在CachedWorkerPool内部创建了ThreadWorker线程任务对象,在ThreadWorker的构造函数中调用了父类NewThreadWorker的构造, 在NewThreadWorker内部中创建了ScheduledThreadPool线程池对象用于执行任务。以此去让任务运行到非主线程里,而在CachedWorkerPool内部也创建了一个ScheduledThreadPool线程池用于清除所有任务。

Schedulers.newThread()

public static Scheduler newThread() {
return RxJavaHooks.onNewThreadScheduler(getInstance().newThreadScheduler);
}


getInstance().newThreadScheduler就是RxJavaSchedulersHook.createNewThreadScheduler()

public static Scheduler createNewThreadScheduler() {
return createNewThreadScheduler(new RxThreadFactory("RxNewThreadScheduler-"));
}


public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new NewThreadScheduler(threadFactory);
}


NewThreadScheduler类

class NewThreadScheduler extends Scheduler {
private final ThreadFactory threadFactory;

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}


在NewThreadScheduler对外提供createWorker()方法用于返回NewThreadWorker对象

public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}


总结

Schedulers.newThread()方法就是在内部创建一个NewThreadScheduler对象,在NewThreadScheduler对象内部对外提供createWorker()方法以便外界调用, 并返回NewThreadWorker对象,在NewThreadWorker对象内部创建了ScheduledThreadPool执行定时任务的线程池,用于在非主线程中执行任务。

AndroidSchedulers.mainThread()

public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}


getInstance()

private static AndroidSchedulers getInstance() {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}


通过for循环获取AndroidSchedulers实例,使用AtomicReference是为了保证多线程安全操作

AndroidSchedulers()类

private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
//获取LooperScheduler对象
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}


Looper.getMainLooper()是获取主线程的Looper对象

LooperScheduler对象

LooperScheduler(Looper looper) {
handler = new Handler(looper);
}


LooperScheduler内部创建了一个Handler对象,并给Handler对象绑定主线程的Looper对象

LooperScheduler的createWorker()方法

public Worker createWorker() {
return new HandlerWorker(handler);
}


HandlerWorker对象

class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;

HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}

@Override
public void unsubscribe() {
unsubscribed = true;
b6b7
//移除handler的任务
handler.removeCallbacksAndMessages(this /* token */);
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
//没什么用
action = hook.onSchedule(action);
//创建了ScheduledAction的任务
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
//把任务包装到Message里
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
//发送一个0毫秒的延迟任务,发送到主线程
handler.sendMessageDelayed(message, unit.toMillis(delayTime));

if (unsubscribed) {
//移除所有任务
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}

return scheduledAction;
}

@Override
public Subscription schedule(final Action0 action) {
//调用同级schedule方法
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}


class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;

ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}

@Override public void run() {
try {
//调用传进来的Action的call方法
action.call();
} catch (Throwable e) {
.....省略.....
}
}

@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}

@Override public boolean isUnsubscribed() {
return unsubscribed;
}
}


ScheduledAction继承Runnable接口,通过Handler去执行ScheduledAction任务,ScheduledAction任务执行run方法后去回调传进来的Action的call方法

总结

AndroidSchedulers.mainThread()方法就是在内部创建一个LooperScheduler对象,在LooperScheduler对象内部创建一个和主线程绑定的Handler对象。

通过外界调用LooperScheduler对象内的的createWorker方法来创建HandlerWorker对象并返回给外界创建的HandlerWorker对象,外界通过调用HandlerWorker对象的内部schedule(Action0 action)方法,把需要执行的任务传递过来, 再调用同级的schedule(Action0 action, long delayTime, TimeUnit unit)方法,把Action0对象封装到ScheduledAction任务中并通过Handler把任务发送到主线程中去执行,来进行主线程的切换。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息