[RxJava学习]subscribeOn源码分析
2016-08-25 15:48
309 查看
学习文章:给 Android 开发者的 RxJava 详解 之后,我们知道了可以通过subscribeOn方法来指定事件产生的代码在哪里执行。
原话如下:
“
指定
这里摘引《给Android开发者的RxJava详解》中的例子,即:
还是老办法,我们通过层层的代码替换,来看看示例中的代码执行情况。
第一句的“Observable.just(1, 2, 3, 4)”经过
接下来,我们重点分析下后两句:
继而:
相关源码如下:
代码简化为:
首先看一下Schedulers.io()的值:
这样的话,代码:
subscriber1.onStart();
final Worker inner = scheduler1.createWorker();
subscriber1.add(inner);
inner.schedule(innerAction1);就等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
innerAction1.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);替换掉innerAction1.call(),代码等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
observable1.unsafeSubscribe(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);替换掉方法unsafeSubscribe,代码等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);
简化如下:
subscriber1.onStart();
<span style="white-space:pre"> </span>scheduledAction0 = new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
};
ScheduledAction s = threadWorker.scheduleActual(scheduledAction0 , delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);
接下来,我们来看一下threadWorker.scheduleActual的源码:
// NewThreadWorker
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}其中的executor的值是在构造函数里赋值的,NewThreadWorker的构造函数如下:
// NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
其中:
在NewThreadWorker的scheduleActual中,executor.submit就相当于Thread.start;这样,系统就会在后续的某一时刻来执行任务ScheduledAction的run方法,cheduledAction的run方法的源码如下:
原话如下:
“
subscribeOn():
指定
subscribe()所发生的线程,即
Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。”
这里摘引《给Android开发者的RxJava详解》中的例子,即:
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });来分析subscribeOn的实现。
还是老办法,我们通过层层的代码替换,来看看示例中的代码执行情况。
第一句的“Observable.just(1, 2, 3, 4)”经过
public static <T> Observable<T> just(T t1, T t2, T t3, T t4) { return from((T[])new Object[] { t1, t2, t3, t4 }); }和
public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return create(new OnSubscribeFromArray<T>(array)); }可以等价为:
array = {1, 2, 3, 4}; onSubscribe1 = new OnSubscribeFromArray<T>(array); observable1.onSubscribe = onSubscribe1;这样,整个示例就等价于:
Observable.just(1, 2, 3, 4)
array = {1, 2, 3, 4}; onSubscribe1 = new OnSubscribeFromArray<T>(array); observable1.onSubscribe = onSubscribe1;
subscribe1 = new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
observable2 = observable1.subscribeOn(Schedulers.io()); observable2.subscribe(subscribe1);
接下来,我们重点分析下后两句:
observable2 = observable1.subscribeOn(Schedulers.io()); observable2.subscribe(subscribe1);其中方法subscribe的主要逻辑如下:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // new Subscriber so onStart it subscriber.onStart(); // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { } return Subscriptions.unsubscribed(); } }代码替换如下:
observable2 = observable1.subscribeOn(Schedulers.io()); subscribe1.onStart(); (observable2.onSubscribe).call(subscribe1);方法subscribeOn进过如下替换:
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }可得到:
observable2 = create(new OperatorSubscribeOn<T>(observable1, Schedulers.io())); subscribe1.onStart(); (observable2.onSubscribe).call(subscribe1);
继而:
<pre name="code" class="java">onSubscribe2 = new OperatorSubscribeOn<T>(observable1, Schedulers.io()); observable2 = create(onSubscribe2); subscribe1.onStart(); onSubscribe2.call(subscribe1);
相关源码如下:
//OperatorSubscribeOn public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } });代码等价为:
subscriber1.onStart(); final Worker inner = scheduler1.createWorker(); subscriber1.add(inner); innerAction1 = new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s1 = new Subscriber<T>(subscriber1) { @Override public void onNext(T t) { subscriber1.onNext(t); } @Override public void onError(Throwable e) { try { subscriber1.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber1.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber1.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s1); } }; inner.schedule(innerAction1);其中scheduler1=Schedulers.io();
代码简化为:
subscriber1.onStart(); final Worker inner = scheduler1.createWorker(); subscriber1.add(inner); inner.schedule(innerAction1);接下来就是跟踪
Schedulers.io()的两个方法:createWorker和schedule:
首先看一下Schedulers.io()的值:
Schedulers.io() ==> ioScheduler = RxJavaSchedulersHook.createIoScheduler(); ==> ioScheduler = createIoScheduler(new RxThreadFactory("RxIoScheduler-")); ==> ioScheduler = new CachedThreadScheduler(threadFactory);然后跟踪一下createWorker的返回值:
// CachedThreadScheduler final AtomicReference<CachedWorkerPool> pool; public Worker createWorker() { return new EventLoopWorker(pool.get()); }其中pool.get()的源码如下:
// CachedWorkerPool ThreadWorker get() { if (allWorkers.isUnsubscribed()) { return SHUTDOWN_THREADWORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } // ThreadWorker private static final class ThreadWorker extends NewThreadWorker至此,我们得知:
final Worker inner = scheduler1.createWorker();执行后,得到了一个EventLoopWorker对象;接下来,我们来看EventLoopWorker的方法:schedule
// EventLoopWorker public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; }
这样的话,代码:
subscriber1.onStart();
final Worker inner = scheduler1.createWorker();
subscriber1.add(inner);
inner.schedule(innerAction1);就等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
innerAction1.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);替换掉innerAction1.call(),代码等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
observable1.unsafeSubscribe(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);替换掉方法unsafeSubscribe,代码等价于:
subscriber1.onStart();
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);
简化如下:
subscriber1.onStart();
<span style="white-space:pre"> </span>scheduledAction0 = new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
s1.onStart();
onSubscribe1.call(s1);
}
};
ScheduledAction s = threadWorker.scheduleActual(scheduledAction0 , delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
subscriber1.add(s);
接下来,我们来看一下threadWorker.scheduleActual的源码:
// NewThreadWorker
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}其中的executor的值是在构造函数里赋值的,NewThreadWorker的构造函数如下:
// NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
其中:
// Executors public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }即创建了一个核心线程数为1的线程池。
在NewThreadWorker的scheduleActual中,executor.submit就相当于Thread.start;这样,系统就会在后续的某一时刻来执行任务ScheduledAction的run方法,cheduledAction的run方法的源码如下:
// ScheduledAction final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription public void run() { try { lazySet(Thread.currentThread()); action.call(); } catch (Throwable e) { // nothing to do but print a System error as this is fatal and there is nowhere else to throw this IllegalStateException ie = null; if (e instanceof OnErrorNotImplementedException) { ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } finally { unsubscribe(); } }这里的“action.call();”最终指的就是scheduledAction0.call();根据前面的转化可知:
scheduledAction0 = new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } s1.onStart(); onSubscribe1.call(s1); } };由此可知,Observable.subscribeOn()方法是将Observable.onSubscribe.call(subscribe);的执行放在了指定的线程里。
相关文章推荐
- RxJava源码分析之subscribeOn和observeOn
- [RxJava学习]操作符flatMap源码分析
- LIVE555再学习 -- testOnDemandRTSPServer 源码分析
- [RxJava学习]操作符Map源码分析
- RxJava中subscribe流程源码分析
- ExtJs源码分析与学习—ExtJs核心代码(一)
- Linux设备驱动程序第三版学习(2)-字符设备驱动程序源码分析(续)
- jQuery1.3.2 源码学习-4 init 函数分析 - 2
- Combres库 学习小结以及部分源码分析
- 麻雀虽小,五脏俱全:分析CVS活动情况的小工具(有源码供学习)
- WinCE6.0学习之EBoot源码分析----startup.s(三)
- Linux 学习数据专题【管理、编程、源码分析】——Linux相关图书选购指南
- 麻雀虽小,五脏俱全:分析CVS活动情况的小工具(有源码供学习)
- WinCE6.0学习之EBoot源码分析----startup.s
- WinCE6.0学习之EBoot源码分析----startup.s(二)
- WinCE6.0学习之EBoot源码分析----startup.s(四)
- Linux设备驱动程序第三版学习(2)-字符设备驱动程序源码分析(续)
- jQuery1.3.2 源码学习-3 init 函数分析 - 1
- 内核源码学习:LILO的运行分析
- DEDE源码分析与学习--index.php文件解读