您的位置:首页 > 编程语言 > Java开发

RxJava的使用与深入学习

2017-02-10 17:24 417 查看
http://blog.csdn.net/evan_man/article/details/51292099


转载请注明出处:http://blog.csdn.net/evan_man/article/details/51292099


简单介绍

可以将RxJava是一种观察者设计模式的升级版本。使用Rxjava的好处在于,我们可以方便的切换方法的执行线程,对线程动态切换,该过程无需我们自己手动创建和启动线程。使用Rxjava创建的代码虽然出现在同一个线程中,但是我们可以设置使得不同方法在不同线程中执行。上述功能的实现主要归功于RxJava的Scheduler实现,Scheduler
提供了『后台处理,前台回调』的异步机制。
Schedulers能创建如下几种类型的线程:

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作
Schedulers.io():一个无数量上限的线程池,可以重用空闲的线程
Schedulers.computation():Scheduler 使用的固定的线程池,大小为 CPU 核数
AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行

RxJava有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件(就是下面会介绍的onNext、onCompleted等方法)。
Observer, 对象中有onNext()、onCompleted()、onCompleted()和onError()。RxJava 规定,当不会再有新的onNext()发出时,需要触发
onCompleted()方法作为标志。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。Observer主要的业务代码大多是编写在其onNext方法中。
Subscriber,是Observer的升级版本,相对于Observer还多了onStart和unSubscribe方法。onStart方法在执行observable.subscribe(observer)方法时就被调用,不能指定线程,只能执行在subscribe()被调用时的线程。调用unSubscribe方法后,Subscriber将不再接收事件,一般在不使用该SubScriber的时候,需要及时调用该方法,以免OOM。执行observable.subscribe(observer)方法时,Observer会先被转换成一个Subscriber对象。

Observable,其构造器接受一个Observable.OnSubscribe对象。执行observable.subscribe(observer)方法时,该方法用于将Observer对象注册到observable中,该方法内部逻辑是调用Observer的onStart方法和Observable.OnSubscribe的call方法。Observable.OnSubscribe对象的call方法会调用Observer的onNext、onCompleted方法(不会调用onStart方法)。此外Observable通过如下的方法指定具体方法的执行线程:

subscribeOn():

指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。
subscribeOn() 的位置放在哪里都可以,但它是只能调用一次,不可以随时切换Observable.OnSubscribe 被激活时所处的线程;当使用了多个
subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

observeOn():

指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
通过 observeOn()的多次调用,程序实现了线程的多次切换,可以随时切换消费事件所在的线程; observeOn() 控制的是它后面的线程。

Rxjava和RxAndroid对应的Github分别为RxJavaRxAndroid。官方给出的介绍特此摘录如下:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It
extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent
data structures.
RxAndroid:Android specific bindings for RxJava. This module adds the minimum classes to RxJava that make writing reactive components in
Android applications easy and hassle-free. More specifically, it provides a Scheduler that schedules on the main UI thread or any given Handler.

简单使用(以Android平台为例)

一、引入如下依赖:

[java] view
plain copy







compile 'io.reactivex:rxjava:1.1.3'

compile 'io.reactivex:rxandroid:1.1.0'

二、创建Observable对象

[java] view
plain copy







Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("Hello");

subscriber.onNext("Hi");

subscriber.onNext("Aloha");

subscriber.onCompleted();

}

});

Observable observable = Observable.just("Hello", "Hi", "Aloha");

String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words);

注意:上面三种创建方式结果一样。

三、创建Subscribe对象,重写onNext、onCompleted、onError方法

[java] view
plain copy







Subscriber subscriber = new Subscriber() {

@Override

public void onCompleted() {

}

@Override

public void onError(Throwable e) {

}

@Override

public void onNext(Object o) {

}

@Override

public void onStart() {

super.onStart();

}

};

四、将SubScriber和Observable绑定

[java] view
plain copy







observable.subscribe(observer);

observable.subscribe(subscriber);

到此为止我们利用RxJava实现了一个简单的观察者模式。但是并没有使用到线程的动态切换功能,该部分才是RxJava跟普通观察者模式的最大区别;下面我们对该部分如何使用进行介绍。但是在正式介绍线程动态切换方法之前,我们先来了解一下Observable的map和flatmap方法,因为RxJava线程动态切换往往伴随着这两个方法的出现。

五、map&flatMap

map方法

[java] view
plain copy







Observable.just("images/logo.png") // 输入类型 String

.map(new Func1<String, Bitmap>() {

@Override

public Bitmap call(String filePath) { // 参数类型 String

return getBitmapFromPath(filePath); // 返回类型 Bitmap

}

})

.subscribe(new Action1<Bitmap>() {

@Override

public void call(Bitmap bitmap) { // 参数类型 Bitmap

showBitmap(bitmap);

}

});

FuncX:: 对有参数且有返回值的一类方法的包装
ActionX: 对只有参数没有返回值的一类方法的包装
可以发现map方法的参数的作用就是将上级的String数据转换成一个bitmap对象。正如它的名字一样map,完成了一个映射的功能,下面来看看flatMap对象。

flatMap方法

flatMap() 的原理:
1. 使用传入的事件对象创建一个 Observable 对象;
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

[java] view
plain copy







Student[] students = ...;

Subscriber<Course> subscriber = new Subscriber<Course>() {

@Override

public void onNext(Course course) {

Log.d(tag, course.getName());

}

...

};

Observable.from(students)

.flatMap(new Func1<Student, Observable<Course>>() {

@Override

public Observable<Course> call(Student student) {

return Observable.from(student.getCourses());

}

})

.subscribe(subscriber);

flatMap的工作原理图如下



对map和flatmap的介绍就到这里,下面对RxJava的线程动态切换如何使用进行介绍。

六、线程的动态切换

[java] view
plain copy







Observable.just(1, 2, 3, 4)

.subscribeOn(Schedulers.io()) //决定调用observable.subscribe(subscriber)方法时的执行线程

.observeOn(Schedulers.newThread()) //决定下面mapOperator方法的执行线程

.map(mapOperator)

.observeOn(Schedulers.io()) //决定下面mapOperator2方法的执行线程

.map(mapOperator2)

.observeOn(AndroidSchedulers.mainThread) //决定下面subscriber对象的onNext()、onCompleted()、onCompleted()和onError()方法执行线程。即UI线程

.subscribe(subscriber);

对RxJava如何使用就介绍到这里了,下面我们通过源码分析一下,RxJava的底层实现。


源码分析

正式分析开始之前,首先确定一下我们这次分析的目标对象和目标方法。

Observable对象创建的三个方法create、just、from;
observable.subscribe(observer)内部逻辑是什么,为何会最终执行到Subscriber对象的相关方法;
Observable中Observable.OnSubscribe对象的call方法何时被调用;
Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
Observable的map和flatMap方法内部实现机制,参数的传递;
Schedulers如何创建指定的线程,各个线程之间的区别是什么;
Observable的observeOn和subscribeOn方法的内部实现机制,如何实现线程的切换;

下面我们针对上面列出的顺序依次进行解答。


Observable.class

以下是Observable类中的Fields:

[java] view
plain copy







final OnSubscribe<T> onSubscribe;

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

//RxJavaPlugins.getInstance()是一个单例模式,获取一个RxJavaPlugins对象,然后这个对象定义了Hook,然而这对Hook并没有做任何事情,至少目前是这样的。

//rxJavaPlugins.getObservableExecutionHook()近似等价于得到一个RxJavaObservableExecutionHook对象,该对象目前不干任何事,完全可以把它看成透明的,你传什么进去它回什么。

下面是Observable的构造器,该构造器并不能被客户直接使用,因为它使用了Protected关键字进行修饰。
Observable()@Observable.class

[java] view
plain copy







protected Observable(OnSubscribe<T> f) {

this.onSubscribe = f;

}

构造器的内容就是对onSubscriber初始化。
Observable对象创建的三个方法create、just、from具体内容如下
create()@Observable.class

[java] view
plain copy







public static <T> Observable<T> create(OnSubscribe<T> f) {

return new Observable<T>(hook.onCreate(f));

}

RxJavaObservableExecutionHook.onCreate方法内部直接直接将参数f返回,所以hook.onCreate(f)等价于f;
just()@Observable.class

[java] view
plain copy







public static <T> Observable<T> just(final T value) {

return ScalarSynchronousObservable.create(value); //note1

}

public static <T> Observable<T> just(T t1, T t2) {

return from((T[])new Object[] { t1, t2 }); //note2

}

1、final class ScalarSynchronousObservable<T> extends Observable<T>是一个Observable的子类,ScalarSynchronousObservable.create方法参考后面的ScalarSynchronousObservable.class部分
2、调用了Observable的from方法
from()@Observable.class

[java] view
plain copy







public static <T> Observable<T> from(T[] array) {

int n = array.length;

if (n == 0) {

return empty();//note1

} else

if (n == 1) {

return just(array[0]);//note2

}

return create(new OnSubscribeFromArray<T>(array));

}

1、方法返回一个(Observable<T>) EmptyHolder.INSTANCE对象

[java] view
plain copy







private static final class EmptyHolder {

final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() {

@Override

public void call(Subscriber<? super Object> subscriber) {

subscriber.onCompleted();

}

});

}

2、调用 just(final T value)创建一个ScalarSynchronousObservable对象
3、class OnSubscribeFromArray<T> implements OnSubscribe<T>,有一种ViewGroup的感觉,其call方法内部会将所有数据依次传递给SubScriber.onNext();随后调用create(OnSubscribe<T>
f)创建对象。OnSubscribeFromArray部分我们在后面OnSubscribeFromArray.class部分还会详细介绍。
到此为止我们分析了创建Observable对象的三个方法。
对于create(OnSubscribe<T> f) 方法,实际是等价于利用构造器Observable(OnSubscribe<T> f) 创建一个Observable对象,f赋值给Observable的onSubscribe域。
对于from和just方法底层实现基本一致

方法只含一个参数:调用ScalarSynchronousObservable.create(value)创建一个ScalarSynchronousObservable对象。
方法含多个参数:首先将多个参数构造成一个T[] array对象数组,随后利用该数组创建一个OnSubscribeFromArray<T>(array)的对象,最后调用Observable(OnSubscribe<T>
f)方法创建一个Observable对象。

综上,最终我们获得的要么是一个Observable对象,要么就是其子类ScalarSynchronousObservable对象。
observable.subscribe(observer)内部逻辑是什么,为何会最终执行到Subscriber对象的相关方法。
Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
subscribe()@Observable.class

[java] view
plain copy







public final Subscription subscribe(final Observer<? super T> observer) {

if (observer instanceof Subscriber) {

return subscribe((Subscriber<? super T>)observer); //note1

}

return subscribe(new Subscriber<T>() { //note2

@Override

public void onCompleted() {

observer.onCompleted();

}

@Override

public void onError(Throwable e) {

observer.onError(e);

}

@Override

public void onNext(T t) {

observer.onNext(t);

}

});

}

1、如果observer引用的对象是一个Subscriber对象,将observer对象强制转换为Subscriber对象
2、创建一个Subscriber对象,对象内部的onNext、onError、onCompleted方法调用Observer的同名方法
不论是note1、还是note2最终都会调用方法

[java] view
plain copy







public final Subscription subscribe(Subscriber<? super T> subscriber) {

return Observable.subscribe(subscriber, this);

}

继续往下看
Observable.subscribe(subscriber, this)@Observable.class

[java] view
plain copy







private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

if (subscriber == null) {

throw new IllegalArgumentException("observer can not be null");

}

if (observable.onSubscribe == null) {

throw new IllegalStateException("onSubscribe function can not be null.");

}

subscriber.onStart();//note1

if (!(subscriber instanceof SafeSubscriber)) {

subscriber = new SafeSubscriber<T>(subscriber);

}

try {

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//note2

return hook.onSubscribeReturn(subscriber);//note3

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

try {

subscriber.onError(hook.onSubscribeError(e)); //note4

} catch (Throwable e2) {

Exceptions.throwIfFatal(e2);

RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

hook.onSubscribeError(r);

throw r;

}

return Subscriptions.unsubscribed();

}

}

1、调用Subscriber.start()方法;
2、当前版本中等价于observable.onSubscribe.call(subscriber);
3、等价于 return subscriber;
4、如果上述方法出现任何异常则调用 subscriber.onError方法
以上是对observable.subscribe方法的分析,我们知道了subscriber对象的onStart、onError方法是在observable.onSubscribe.call(subscriber)的前后执行。而onNext、onCompleted方法在call中出现,具体内容参考后面OnSubscribeFromArray.call方法和ScalarSynchronousObservable..onSubscribe.call方法。

往下我们该分析分析Observable的map方法内部实现机制;
Observable<R> map()@Observable.class

[java] view
plain copy







public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {

return lift(new OperatorMap<T, R>(func));

}

OperatorMap是一个实现了public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { }接口的类,用于将一个Subscriber<R>类型对象转换成一个Subscriber<T>类型对象。随后将该对象传递给lift方法,该方法很关键!往下看

Tips:

FuncX是定义在rx.functions.*包下面的一个接口,是对一类有返回值方法的封装。Func0代表无参数的方法,Func1代表有一个参数的方法,FuncX以此类推。
ActionX是定义在rx.functions.*包下面的一个接口,是对一类无返回值方法的封装。Action0代表无参数的方法,Action1代表有一个参数的方法,ActionX以此类推。

Observable<R> lift()@Observable.class

[java] view
plain copy







public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {

return new Observable<R>(new OnSubscribe<R>() {

@Override

public void call(Subscriber<? super R> o) {

try {

Subscriber<? super T> st = hook.onLift(operator).call(o); //note1

try {

st.onStart(); //note2

onSubscribe.call(st); //note3

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

st.onError(e); //note4

}

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

o.onError(e); //note5

}

}

});

}

1、hook.onLift(operator).call(o)等价于operator.call(o);该行语句的结果就是将Subscriber<? super R> o转换换成Subscriber<? super T> 。
2、最后会调用刚刚得到的Subscriber<? super T>对象的onStart方法,可能有一些朋友会说o.onStart方法在哪里调用的呢?其实o.onStart在进入这里call方法之前就已经执行了。
3、这里需要特别注意,onSubscribe是lift方法所属的 Observable<T>对象的onSubscribe域,而不是 Observable<R>对象的onSubscribe域
4、调用Subscriber<? super T>.的onError
5、调用Subscriber<? super R>的onError

尝试解释一:
讲到这里估计大伙儿的脑子是晕头转向的,说实话这里一时半会是真的很难以理解,那我们对照着范例来梳理一遍。

[java] view
plain copy







Observable.just("images/logo.png") //note1

.map(new Func1<String, Bitmap>() {

@Override

public Bitmap call(String filePath) {

return getBitmapFromPath(filePath);

}

}) //note2

.subscribe(new Action1<Bitmap>() { //note3

@Override

public void call(Bitmap bitmap) {

showBitmap(bitmap);

}

});

为了下面分析的方便,在此进行如下约定,String 记号为T ;Bitmap记号为R
1、首先我们创建了一个Observable<T>对象,对应一个OnSubscribe<T>,其call方法参数Subscriber<T>,表明Subscriber.onNext方法接受参数类型为T
2、随后调用map方法,该方法内部有三个重要的方法

Func1<T,R>将T类型的数据转换为R类型数据;
OperatorMap<T, R>(func) 将Subscriber<R>转换成一个Subscriber<T>,前者Subscriber接受参数为R,后者接收参数为T
lift方法创建一个Observable<R>对象,对应一个OnSubscribe<R>,其call方法参数Subscriber<R>,表明Subscriber.onNext方法接受参数类型为R。

3、经过上面的第二步得到一个Observable<R>对象,调用该对象的subscribe(new Action1<R>() )方法。方法内部调用Observable<R>对象中的OnSubscriber<R>对象的call方法,此时的call方法内容如下(注:以下内容并非源码,只摘录了几个关键的点):
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = operator.call(o);// 在此处将Subscriber<R>转换成一个Subscriber<T>
st.onStart();
onSubscribe.call(st);//注意!这里的onSubscribe对象是属于Observable<T>中的域。对于多个map嵌套,直接可以把它理解为调用map方法的Observable对象中的onSubscribe域。
...
}

上面解释还没懂?再来一发
尝试解释二:

[java] view
plain copy







Observable.just("images/logo.png") // 对应Observable<String>

.map(new Func1<String, T1>() { }) // 对应Observable<T1>

.map(new Func1<T1, T2>() { }) // 对应Observable<T2>

.map(new Func1<T2, R>() { }) // 对应Observable<R>

.subscribe(new Action1<R>() { // 对应Observable<R>

@Override

public void call(R r) {

//

};

其实上面的一串方法等价于

[java] view
plain copy







Observable.just("images/logo.png") // 对应Observable<String>

.subscribe(new Action1<R>() { // 对应Observable<R>

@Override

public void call(R new Func1<T1, T2>().call (new Func1<T1, T2>().call(new Func1<String, T1>().call(String) ) ) ) {

};

flatMap()@Observable.class
前面介绍完了map方法,接着我们介绍flatMap方法的实现,回顾一下flatMap的原理
1. 使用传入的事件对象创建一个 Observable 对象;
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

[java] view
plain copy







public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {

if (getClass() == ScalarSynchronousObservable.class) {

return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); //note1

}

return merge(map(func));

}

1、Observable只包含一个事件的情况,参考ScalarSynchronousObservable.class的源码,该部分比较简单,如果只想稍微了解下flatMap的工作原理可以略过下面的分析。该部分比Map还要让人抓狂。
2、
map方法已经介绍过,目的是将一个Observable<T>转变成一个Observable<R>; 其效果如下Observable<T>.onSubcribe.call(Subscriber<T>){ Subscriber<R>.onNext(Func(t)); }
这里对应的map效果为Observable<T>.onSubcribe.call(Subscriber<T>){ Subscriber<R>.onNext(Func(t)); } 但是这里Func(t)返回的对象类型为Observable<R>, 我们自定义的Subscriber默认情况下是不能对其进行处理的需要将Observable<R>数据进行解析得到其包含的事件。
根据ScalarSynchronousObservable<T>.scalarFlatMap方法的实现,这里的merge(map(func))效果做出如下假设:Observable<T>.onSubcribe.call(Subscriber<T>){ Observable<R> o = Func(t); o.unsafeSubscribe(Subscriber<R>); }

我们往下看merge方法是如何做的。
merge()@Observable.class

[java] view
plain copy







public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { //note0

if (source.getClass() == ScalarSynchronousObservable.class) {

return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity()); //note1

}

return source.lift(OperatorMerge.<T>instance(false)); //note2

}

0、source是经过map(func)获得的一个Observable对象,该对象保存的事件类型为Observable
1、经过func对参数t处理后得到的Observable source只包含一个事件。UtilityFunctions.identity()方法返回的是一个输入什么就返回什么的Func1对象
2、经过func对参数t处理后得到的Observable source只包含多个事件。lift方法的参数是一个实现了Operator<T, Observable<? extends T>>接口的对象——该接口很简单就是将T转换为Observable<? extends T>对象。lift方法最终返回Observable<T>对象。该对象运行效果如下Observable<T>.onSubcribe.call(Subscriber<T>){
Subscriber<? super T> st = operator.call(o); st.onStart(); onSubscribe.call(st);...}有没有发现它跟map方法大体上是一样的,只是注意到的是这里的operator是一个实现了Operator<T, Observable<? extends T>>接口的对象,它将一个Subscriber<T>转换成一个Subscriber<Observable<?
extends T>>对象,因此重点肯定在这个接口的实现内部

OperatorMerge@OperatorMerge<T>.class

[java] view
plain copy







public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>>

public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {

MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); //note1

MergeProducer<T> producer = new MergeProducer<T>(subscriber);

subscriber.producer = producer;

child.add(subscriber);

child.setProducer(producer);

return subscriber;

}

1、此处完成了对Subscriber child的包装,创建一个MergeSubscriber对象,下面我们看看该对象的内部实现
MergeSubscriber.class@OperatorMerge<T>.class

[java] view
plain copy







static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> //该Subscriber接受的参数是一个Observable

final Subscriber<? super T> child;

volatile Queue<Object> queue;

volatile InnerSubscriber<?>[] innerSubscribers;

public void onNext(Observable<? extends T> t) { //note1

if (t == null) {

return;

}

if (t instanceof ScalarSynchronousObservable) { //note2

tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());

} else {

InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); //note3

addInner(inner);

t.unsafeSubscribe(inner); //note4

emit(); //note5

}

}

1、MergeSubscriber接收的参数类型为Observable
2、ScalarSynchronousObservable.get方法返回的是ScalarSynchronousObservable中存储的事件。然后调用tryEmit方法处理事件,正如名字所言,尝试提交t给child处理,如果当前条件不允许,比如有其它事件正在被处理则将该t先存入队列queue中。
3、创建一个子Subscriber对象,其父亲为当前MergeSubscriber,并将inner加入到MergeSubscriber的innerSubscribers;数组中,inner的onNext方法会调用父类的tryEmit方法。注意每个Observable对应一个InnerSubscriber
4、该方法执行的效果是,调用inner的onNext方法,参数为Observable<? extends T> t中所包含的事件。
5、该方法内部调用emitLoop方法,从queue中不断获取数据t并传给Subscriber child去执行。内部结构很像OperatorObserveOn的shedule方法
MergeSubscriber小结:
MergeSubscriber包含一个真实的Subscriber对象,一个存储事件的队列,一个InnerSubscriber的数组。
onNext方法接收一个Observable对象,1、创建一个InnerSubscriber对象,意味着一个Observable对应一个InnerSubscriber对象;2、将该对象添加到MergeSubscriber的InnerSubscriber的数组中,同时InnerSubscriber对象包含一个对MergeSubscriber对象的引用;3、调用Observable对象的unsafeSubscribe(InnerSubscriber对象)方法,使得InnerSubscriber对象去消费Observable对象中的事件。
InnerSubscriber对象消费事件的逻辑是调用MergeSubscriber的tryEmit方法去处理获取到的事件。
tryEmit处理事件的逻辑是如果Subscriber正在处理一个事件,则先将事件存入事件队列中;否则直接调用child.onNext(t)进行处理。

到此为止我们对map和flatmap的介绍都介绍完毕了,下面将对线程的动态切换进行介绍。
Observable的observeOn和subscribeOn方法的内部实现机制,如何实现线程的切换?
为了回答上面的问题首先看看observeOn方法是如何实现的

observeOn()@Observable.class

[java] view
plain copy







public final Observable<T> observeOn(Scheduler scheduler) {

return observeOn(scheduler, RxRingBuffer.SIZE);

}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {

return observeOn(scheduler, false, bufferSize);

}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //note1

if (this instanceof ScalarSynchronousObservable) {

return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); //note2

}

return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); //note3

}

1、当调用observeOn(Scheduler scheduler)方法时默认调用的是observeOn(scheduler, false, RxRingBuffer.SIZE)
2、scalarScheduleOn方法参见后面的ScalarSynchronousObservable.class源码
3、OperatorObserveOn<T>实现了Operator<T, T>接口,实现了call方法。lift方法在前面已经介绍过了,不同之处是这里利用OperatorObserveOn对SubScriber进行包装处理。由一个SubScriber<T>对象转变成另一个SubScriber<T>,两者的参数不变,但是具体执行的线程可能发生了变化。
下面我们对OperatorObserveOn对象的call方法进行探究,跳转至OperatorObserveOn.class

subscribeOn()@Observable.class

[java] view
plain copy







public final Observable<T> subscribeOn(Scheduler scheduler) {

if (this instanceof ScalarSynchronousObservable) {

return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);//note1

}

return create(new OperatorSubscribeOn<T>(this, scheduler)); //note2

}

1、对于ScalarSynchronousObservable<T>).scalarScheduleOn()方法,参考ScalarSynchronousObservable.class源码部分
2、这里等价于return new Observable<T>(OperatorSubscribeOn<T>(this, scheduler)); 即利用当前Observable对象和scheduler对象构建一个OnSubscribe对象,随后利用OnSubscribe构建一个Observable对象。
往下我们看看OperatorSubscriberOn类是如何实现的。


OperatorSubscribeOn.class

OperatorSubscribeOn@OperatorSubscribeOn.class

[java] view
plain copy







public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;

final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {

this.scheduler = scheduler;

this.source = source;

}

@Override

public void call(final Subscriber<? super T> subscriber) {

final Worker inner = scheduler.createWorker(); //inner 工作线程别名。

inner.schedule(new Action0() { //note1

@Override

public void call() {

final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) { //note2

@Override

public void onNext(T t) { subscriber.onNext(t); }

@Override

public void onError(Throwable e) { subscriber.onError(e); inner.unsubscribe(); }

@Override

public void setProducer(final Producer p) {

subscriber.setProducer(new Producer() {

@Override

public void request(final long n) {

if (t == Thread.currentThread()) {

p.request(n);

} else {

inner.schedule(new Action0() {

@Override

public void call() { p.request(n); }

});

} //end of else

} //end of request

}); //end of subscriber.setProducer

} //end of setProducer

};//end of new Subscriber<T>(subscriber)

source.unsafeSubscribe(s); //note3

}//end of inner call

}); // end of inner.schedule(new Action0()

}// end of outter call

}//end of class

1、该方法作用就是将参数Action的call方法传递给work的一个线程去执行。
2、对外部call接收到的Subscriber对象进行一次封装
3、该方法底层如下
unsafeSubscribe()@Observable.class

[java] view
plain copy







public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {

try {

subscriber.onStart(); //note1

onSubscribe.call(subscriber); //note2

return subscriber;

} catch (Throwable e) {

try {

subscriber.onError(e);

} catch (Throwable e2) {

...

}

return Subscriptions.unsubscribed();

}

}

1、首先调用Subscriber对象的onStart方法
2、随后调用Observable_outter<T>.onSubscribe.call(subscriber)方法,该方法内部会将执行subscriber的onNext、onCompleted方法

这里我们对subscribeOn方法进行一下小结:
subscribeOn方法用于指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。 当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
调用subscribeOn方法的Observable对象命名为(命名为Observable_inner,方法执行之后得到一个Observable对象(命名为Observable_outter),Observable_outter的OnSubscribe.call方法内部会创建一个call方法,该该方法会在一个指定的线程中执行,该方法内部首先将Observable_outter的OnSubscribe.call接受的参数Subscriber对象进行一次封装,随后调用Observable_inner对象的unsafeSubscribe()方法,该方法内部会执行封装过的Subscriber的onStart、OnSubscribe的call方法。


OperatorObserveOn.class

call()@OperatorObserveOn.class

[java] view
plain copy







@Override public Subscriber<? super T> call(Subscriber<? super T> child) {

if (scheduler instanceof ImmediateScheduler) { //note1

return child;

} else if (scheduler instanceof TrampolineScheduler) { //note2

return child;

} else {

ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //note3

parent.init();

return parent;

}

}

1、当前的scheduler对象为ImmediateScheduler表明该SubScriber在当前线程执行,因此不需要切换线程。
2、当前的scheduler对象为TrampolineScheduler表明该SubScriber在当前线程执行,只是并不立即执行,因此不需要切换线程。

3、利用SubScriber和Scheduler创建一个ObserveOnSubscriber对象,调用该对象的init方法。最后返回该包装后的ObserveOnSubscriber对象
往下分析下ObserveOnSubscriber对象,因为其继承自SubScriber因此我们看看它的onNext、onCompleted方法跟之前的SubScriber child有什么关系。随后分析一下如何保证是在另一个线程中执行。

ObserveOnSubscriber.class@OperatorObserveOn.class

class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
Fields

[java] view
plain copy







final Subscriber<? super T> child;

final Scheduler.Worker recursiveScheduler;

final NotificationLite<T> on;

final Queue<Object> queue;

volatile boolean finished; //当前流的状态

long emitted;//当前被处理的事件数

final int limit; //处理的门限,达到该值需要重新调用request方法

final AtomicLong requested = new AtomicLong();

final AtomicLong counter = new AtomicLong();

ObserveOnSubscriber()@ObserveOnSubscriber.class

[java] view
plain copy







public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {

this.child = child; //note1

this.recursiveScheduler = scheduler.createWorker(); //note2

this.on = NotificationLite.instance(); //note3

this.limit = calculatedSize - (calculatedSize >> 2);

int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;

if (UnsafeAccess.isUnsafeAvailable()) { //note4

queue = new SpscArrayQueue<Object>(calculatedSize);

} else {

queue = new SpscAtomicArrayQueue<Object>(calculatedSize); //note5

}

request(calculatedSize); //note6

}

1、child为SubScriber<T>对象在ObserveOnSubscriber对象中对应的别名;
2、从Scheduler对象中获取一个work对象,随后就不用Scheduler对象了,只是对work进行操作;
3、获取一个NotificationLite对象的实例,用于对一个空对象的再次包装,使其不出现空指针异常;
4、检测当前系统是否包含"suc.misc.Unsafe"对象,如Android该方法返回值为false;所以执行else语句;但是java环境则一般执行if语句
5、创建的是一个Single-Producer-Single-Consumer queue大小由bufferSize决定,存储和获取队列中元素具有原子性
6、该方法的实现在Subscriber.class中,参见SubScriber.class源码,可以透露的是方法的作用就是设置该Subscriber接收的最大事件数
init()@ObserveOnSubscriber.class

[java] view
plain copy







void init() {

Subscriber<? super T> localChild = child;

localChild.setProducer(new Producer() { //note0

@Override

public void request(long n) {

if (n > 0L) {

BackpressureUtils.getAndAddRequest(requested, n); //note1

schedule(); //note2

}

}

});

localChild.add(recursiveScheduler); //note3

localChild.add(this); //note4

}

该方法主要完成向localChild注入一些参数,如果直接忽略这部分对我们对线程切换的机制影响并不大,不感兴趣的同学可以跳过。
0、调用Subscriber的setProducer方法,方法内部执行Producer.request(calculatedSize)方法
1、requested为之前创建的AtomicLong对象,requested的值正常情况为n,即需要向Subscriber传递的最大事件数
2、schedule方法
3、recursiveScheduler为之前通过scheduler.createWorker()得到的对象,将该对象添加到SubScriber的SubscriptionList中
4、将当前ObserveOnSubscriber对象添加到SubScriber的SubscriptionList中,对于为何不将init方法放置在构造器中,是因为这里要使用关键字this

schedule()@ObserveOnSubscriber.class

[java] view
plain copy







protected void schedule() {

if (counter.getAndIncrement() == 0) {//note1

recursiveScheduler.schedule(this); //note2

}

}

1、加1操作成功后,返回成功之前的被加数,被加数等于0则调用work的schedule方法传入参数this,即第一次执行该方法结果值为0,往后的方法大多数情况将为假,不过虽然为假,但是调用的次数会被记录到counter中,call方法会在退出循环前检测是否重新执行一次循环,还是退出循环,保证后面提交的事件能够被处理。
2、work的schedule方法内部根据不同的Schedule有不同的形式,在后面的内容中我们将对AndroidSchedulers.class和NewThreadScheduler.class进行分析。这里可以提前告知的是在work的schedule方法内部会调用ObserveOnSubscriber的call方法。
分析call方法之前,首先分析ObserveOnSubscriber对象和普通SubScriber的onNext、onCompleted、onError方法的区别。
onNext()@ObserveOnSubscriber.class

[java] view
plain copy







@Override public void onNext(final T t) {

if (isUnsubscribed() || finished) {

return;

}

if (!queue.offer(on.next(t))) { //note1

onError(new MissingBackpressureException());

return;

}

schedule();//note2

}

1、queue是之前创建的SpscAtomicArrayQueue<Object>(calculatedSize)对象,on是NotificationLite实例,on.next作用是当t非空则返回t,否则返回具有一个结束标志的对象。该行语句正常情况下是将t存入queue中,同时返回true。
2、调用schedule方法。可见ObserveOnSubscriber的onNext方法并不执行其包含的SubScriber child的onNext方法
onCompleted()@ObserveOnSubscriber.class

[java] view
plain copy







@Override public void onCompleted() {

if (isUnsubscribed() || finished) {

return;

}

finished = true; //note1

schedule(); //note2

}

1、设置结束标志
2、调用schedule方法。可见ObserveOnSubscriber的onCompleted方法并不执行其包含的SubScriber child的onCompleted方法
onError()@ObserveOnSubscriber.class

[java] view
plain copy







@Override public void onError(final Throwable e) {

if (isUnsubscribed() || finished) {

RxJavaPlugins.getInstance().getErrorHandler().handleError(e);

return;

}

error = e; //ntoe1

finished = true; //note1

schedule(); //note2

}

1、设置error标志和结束标志
2、调用schedule方法。可见ObserveOnSubscriber的onError方法并不执行其包含的SubScriber child的onError方法
因为work的schedule方法内部会调用ObserveOnSubscriber的call方法。所以这里有两个重要的地方,第一call方法执行的位置在work指定的线程中执行,第二call方法内部调用了SubScriber child的onNext方法。对于前者在后面的内容中我们将根据AndroidSchedulers.class和NewThreadScheduler.class进行具体分析,对于后者我们先来看其代码

call()@ObserveOnSubscriber.class

[java] view
plain copy







@Override public void call() {

long missed = 1L; //任务数门限

long currentEmission = emitted; //当前任务数

final Queue<Object> q = this.queue; //任务队列

final Subscriber<? super T> localChild = this.child; //子Subscriber

final NotificationLite<T> localOn = this.on;

for (;;) {

long requestAmount = requested.get(); //Subscriber所能接收的最大事件数

while (requestAmount != currentEmission) { //当前完成任务数没有达到门限值

boolean done = finished;

Object v = q.poll(); //note1

boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {//note3

return;

}

if (empty) {

break;

}

localChild.onNext(localOn.getValue(v)); //note2

currentEmission++;//当前任务数+1

if (currentEmission == limit) {

requestAmount = BackpressureUtils.produced(requested, currentEmission);//将requestted值减去当前执行结束的任务数,即得到Subscriber所能接收的最大事件数减少。

request(currentEmission); //调用Subscriber的request方法参数为当前执行的任务数

currentEmission = 0L; //当前任务数为0

}

}

if (requestAmount == currentEmission) {//Subscriber所能接收的最大事件数等于当前执行完的任务数

if (checkTerminated(finished, q.isEmpty(), localChild, q)) { //note3

return;

}

}

emitted = currentEmission; //执行到这里表明还有没有被消费的事件,记录当前消费到的事件位置

missed = counter.addAndGet(-missed); //note4

if (missed == 0L) {

break;

}

}

该方法会在某条线程中被执行!
1、获取得到待处理的值v,在调用ObserveOnSubscriber的onNext方法是会将该方法 的参数存储queue队列中。则call方法中会读取queue中数。
2、可以将它近似看成localChild.onNext(v);
3、该处调用的方法内部根据判断是否执行onCompleted()方法或者执行onError方法,该方法内部最外面的判断是是判断finished的值,而该值只有在外界调用了ObserveOnSubscriber..onCompleted或者ObserveOnSubscriber.onError方法时才会被设定为true。
4、将counte的值减去missed后不能与0,表明在执行call方法时,shedule方法在外别被再次调用,因此需要继续执行循环,检查是否有新的事件等待消费

Schedulers如何创建指定的线程,各个线程之间的区别是什么.


Scheduler.class

该抽象类定义了如下几个方法
createWorker()@Scheduler.class

[java] view
plain copy







public abstract Worker createWorker();//note1

public abstract static class Worker implements Subscription {

public abstract Subscription schedule(Action0 action);//note2

public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);

}

1、返回一个Worker该Worker对象内部管理一个线程池
2、外界一般会调用该方法将Action0对象传给Worker,方法内部会在上面的线程池中选取一条线程执行Action0的call()方法。
下面分别以Schedulers.newThread()创建的NewThreadScheduler和AndroidSchedulers.mainThread()创建的AndroidSchedulers为例进行说明。


NewThreadScheduler.class

createWorker()@NewThreadScheduler.class

[java] view
plain copy







private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-";//note1

private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);//note2

public Worker createWorker() {

return new NewThreadWorker(THREAD_FACTORY); //note3

}

1、线程工厂创建线程的名字前缀
2、创建一个线程工厂,该对象的newThread方法会返回一个特定的线程
3、利用该线程工厂创建一个NewThreadWorker对象


NewThreadWorker.class

NewThreadWorker()@NewThreadWorker.class

[java] view
plain copy







public NewThreadWorker(ThreadFactory threadFactory) {

ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); //note1

....

executor = exec;

}

1、创建一个预定执行的固定线程池
schedule()@NewThreadWorker.class

[java] view
plain copy







public Subscription schedule(final Action0 action) {

return schedule(action, 0, null);

}

public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {

if (isUnsubscribed) {//note2

return Subscriptions.unsubscribed();

}

return scheduleActual(action, delayTime, unit);

}

1、会在执行前判断是否绑定,否则不进行下面的操作,表明当我们调用Subscribe.unsubscribed方法时,它将不会再收到消息,同时对应的线程也不再向下执行

[java] view
plain copy







public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {

ScheduledAction run = new ScheduledAction(action); //note1

Future<?> f;

if (delayTime <= 0) {

f = executor.submit(run); //note2

} else {

f = executor.schedule(run, delayTime, unit);

}

return run;

}

1、ScheduledAction实现了Runnable接口,run方法内部调用action的call方法
2、将run对象提交给线程池执行


AndroidSchedulers.class

mainThread()@AndroidSchedulers.class

[java] view
plain copy







public static Scheduler mainThread() {

Scheduler scheduler = RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler(); //note1

return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER; //note2

}

1、等价于rxAndroidPlugins.getSchedulersHook().getMainThreadScheduler();等价于RxAndroidSchedulersHook.getDefaultInstance().getMainThreadScheduler()等价于null
2、return HandlerScheduler(new Handler(Looper.getMainLooper()));获取到了一个和UI线程的Looper绑定的Handler


HandlerScheduler.class

[java] view
plain copy







private final Handler handler;

HandlerScheduler(Handler handler) {

this.handler = handler;

}

createWorker()@HandlerScheduler.class

[java] view
plain copy







public Worker createWorker() {

return new HandlerWorker(handler);

}

static class HandlerWorker extends Worker {

private final Handler handler;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Subscription schedule(final Action0 action) {

return schedule(action, 0, TimeUnit.MILLISECONDS);

}

@Override

public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

if (compositeSubscription.isUnsubscribed()) {//note0

return Subscriptions.unsubscribed();

}

action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action); //note1

final ScheduledAction scheduledAction = new ScheduledAction(action); //note2

...

handler.postDelayed(scheduledAction, unit.toMillis(delayTime)); //note3

....

return scheduledAction;

}

}

0、会在执行前判断是否绑定,否则不进行如下的操作,表明当我们调用Subscribe.unsubscribed方法时,它将不会再收到消息,同时对应的线程也不再向下执行
1、很明显,前面一堆现在基本用不上,所以该行语句在目前来看是没有意义的,Action最终还是Action
2、我们只需要知道ScheduledAction对象实现了Runnable接口,在run方法内部调用了action的call方法
3、将ScheduledAction对象交给Handler,在合适的时间将被执行
到此为止我们对AndroidSchedulers.class和NewThreadScheduler.class两个类进行了分析,其它类型的继承Scheduler的类基本类似,在此不再详细介绍。大体上它们完成的任务就是将Action0对象交给自己的线程池去执行。

下面我们一次来看一下ScalarSynchronousObservable对象和OnSubscribeFromArray对象中的OnSubscriber对象,重点看下其各自的call方法


ScalarSynchronousObservable.class

Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
ScalarSynchronousObservable()@ScalarSynchronousObservable.class

[java] view
plain copy







final T t;

protected ScalarSynchronousObservable(final T t) {

super(new OnSubscribe<T>() {

@Override

public void call(Subscriber<? super T> s) {

s.setProducer(createProducer(s, t)); //note1

}

});

this.t = t;

}

1、ScalarSynchronousObservable对象的OnSubscriber对象的call方法,调用Subscriber.setProducer方法,该方法的具体内容参考Subscriber.class的源码。
可以提前预告的是Subscriber.setProducer(producer)方法完成的任务有:给Subscriber对象的Producer域赋值,调用producer.request方法。
Producer是一个接口,它只有一个request方法;一般实现该接口的类,都会包含一个Subscriber对象和一个待处理的数据,createProducer(s, t)方法中,s是一个Subscriber对象,t是一个待处理的参数。发现没,我们完全可以在Producer中先对t进行相应的处理随后,再将数据传送给s。

create(T t)@ScalarSynchronousObservable.class

[java] view
plain copy







public static <T> ScalarSynchronousObservable<T> create(T t) {

return new ScalarSynchronousObservable<T>(t);

}

createProducer()@ScalarSynchronousObservable.class

[java] view
plain copy







static <T> Producer createProducer(Subscriber<? super T> s, T v) {

if (STRONG_MODE) { //默认是false

return new SingleProducer<T>(s, v);

}

return new WeakSingleProducer<T>(s, v); 返回这个Producer

}

WeakSingleProducer.class@ScalarSynchronousObservable.class

[java] view
plain copy







static final class WeakSingleProducer<T> implements Producer{

final Subscriber<? super T> actual;

final T value;

boolean once;

public void request(long n) {

if (once) {

return;

}

if (n < 0L) {

throw new IllegalStateException("n >= required but it was " + n);

}

if (n != 0L) {

once = true;

Subscriber<? super T> a = actual;

if (a.isUnsubscribed()) {

return;

}

T v = value;

try {

a.onNext(v); //note1

} catch (Throwable e) {

Exceptions.throwOrReport(e, a, v);

return;

}

if (a.isUnsubscribed()) {

return;

}

a.onCompleted(); //note2

}

}

}

1、执行onNext方法,ScalarSynchronousObservable类对应只有一个值需要处理,因此这里也就只需要调用一次onNext方法。
2、执行onCompleted方法,该方法不一定被执行,因为不排除异常情况。

scalarFlatMap()@ScalarSynchronousObservable.class

[java] view
plain copy







public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {

return create(new OnSubscribe<R>() {

@Override public void call(final Subscriber<? super R> child) {

Observable<? extends R> o = func.call(t); //note1

if (o instanceof ScalarSynchronousObservable) {

child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t)); //note2

} else {

o.unsafeSubscribe(child); //note3

}

}

});

}

1、将当前ScalarSynchronousObservable对象的唯一元素t交给func处理,得到一个Observable对象o
2、如果o是一个ScalarSynchronousObservable对象;createProducer方法前面已经介绍,默认情况是创建一个WeakSingleProducer对象;然后将WeakSingleProducer对象传入child.setProducer方法,最终会执行WeakSingleProducer对象的request方法,方法中再将o的事件交给Subscriber处理
3、该方法的效果就是调用child.onStart, o.onSubscribe.call(child)方法,即将事件传递给child消费


OnSubscribeFromArray.class

public final class OnSubscribeFromArray<T> implements OnSubscribe<T>
Fields
final T[] array;
OnSubscribeFromArray()@OnSubscribeFromArray.class
public OnSubscribeFromArray(T[] array) { this.array = array; }
Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
call()@OnSubscribeFromArray.class

[java] view
plain copy







@Override public void call(Subscriber<? super T> child) {

child.setProducer(new FromArrayProducer<T>(child, array));

}

child.setProducer()方法内部一般情况会调用producer.request(Long.MAX_VALUE)方法
FromArrayProducer<T>@OnSubscribeFromArray.class

[java] view
plain copy







static final class FromArrayProducer<T> extends AtomicLong implements Producer

final Subscriber<? super T> child;

final T[] array;

int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {

this.child = child;

this.array = array;

}

@Override public void request(long n) {

if (n < 0) {

throw new IllegalArgumentException("n >= 0 required but it was " + n);

}

if (n == Long.MAX_VALUE) {

if (BackpressureUtils.getAndAddRequest(this, n) == 0) { //note1

fastPath();

}

} else if (n != 0) {

if (BackpressureUtils.getAndAddRequest(this, n) == 0) {

slowPath(n);

}

}

}

1、n一般情况等于Long.MAX_VALUE,原因参考Subscriber.class。返回对this的long域执行加n操作成功后,加成功之前的值。是CAS操作,set-and-swap操作。即如果是第一次调用FromArrayProducer.request方法则在该行语句BackpressureUtils.getAndAddRequest(this,
n) == 0为真
fastPath()@OnSubscribeFromArray.class

[java] view
plain copy







void fastPath() {

final Subscriber<? super T> child = this.child;

for (T t : array) {

if (child.isUnsubscribed()) {

return;

}

child.onNext(t); //note1

}

if (child.isUnsubscribed()) {

return;

}

child.onCompleted(); //note2

}

1、调用Subscriber的onNext方法,这里执行的是一个循环将构造Observable时的参数一次传给onNext方法。
2、调用Subscriber的onCompleted方法。
slowPath()@OnSubscribeFromArray.class

[java] view
plain copy







void slowPath(long r) {

final Subscriber<? super T> child = this.child;

final T[] array = this.array;

final int n = array.length;

long e = 0L;

int i = index; //若是第一次执行这里的index=0

for (;;) {

while (r != 0L && i != n) { //note1

if (child.isUnsubscribed()) {

return;

}

child.onNext(array[i]);

i++;

if (i == n) {

if (!child.isUnsubscribed()) {

child.onCompleted();

}

return;

}

r--;

e--;

} //note2

r = get() + e; //note3

if (r == 0L) { //note2

index = i; //记录当前执行到的任务数

r = addAndGet(e);

if (r == 0L) {

return;

}

e = 0L;

}

}

}

1、判断r剩下执行任务数不为0,当前执行完事件数不大于事件总数
2、 运行到这里证明当前还有事件没有被Subscriber处理
3、get获取到的值是前面request方法BackpressureUtils.getAndAddRequest(this, n) == 0设置的值,即该Subscriber接受的最大任务数,所以note3执行之后r为待处理的最大任务数
4、一般情况这里为真,即任务已经全部处理完毕,但是还有事件没有被处理完毕,index记录当前执行到的事件次数,并且将当前的AtomicLong的long域设置为0L,否则下次BackpressureUtils.getAndAddRequest(this, n) == 0将不会为真了。


Subscriber.class

public abstract class Subscriber<T> implements Observer<T>, Subscription
Fields

[java] view
plain copy







private final SubscriptionList subscriptions; //构造器中赋初值

private final Subscriber<?> subscriber; //构造器中赋初值

private Producer producer; //事件制造者

private long requested = NOT_SET; //最大接受任务数

private static final Long NOT_SET = Long.MIN_VALUE;

Subscriber()@Subscriber.class

[java] view
plain copy







protected Subscriber() { this(null, false); }

protected Subscriber(Subscriber<?> subscriber) { this(subscriber, true); }

protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {

this.subscriber = subscriber;

this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); //note1

}

1、class SubscriptionList implements Subscription;该对象内部包含private
LinkedList<Subscription> subscriptions和private volatile boolean unsubscribed;两个域
add()@Subscriber.class

[java] view
plain copy







public final void add(Subscription s) {

subscriptions.add(s);

}

将s添加到SubscriptionList的LinkedList<Subscription> 中,在添加前需要判断SubscriptionList的unsubscribed对象是否为假
onStart()@Subscriber.class

[java] view
plain copy







<pre name="code" class="java">public void onStart() {

// do nothing by default

}



unsubscribe()@Subscriber.class

[java] view
plain copy







@Override public final void unsubscribe() {

subscriptions.unsubscribe();

}

SubscriptionList的unsubscribed = true;subscriptions = null设值
isUnsubscribed()@Subscriber.class

[java] view
plain copy







@Override public final boolean isUnsubscribed() {

return subscriptions.isUnsubscribed();

}

返回SubscriptionList的subscriptions值

request()@Subscriber.class

[java] view
plain copy







protected final void request(long n) {

if (n < 0) {

throw new IllegalArgumentException("number requested cannot be negative: " + n);

}

Producer producerToRequestFrom = null;

synchronized (this) {

if (producer != null) {

producerToRequestFrom = producer;

} else {

addToRequested(n); //note1

return;

}

}

producerToRequestFrom.request(n); //note2

}

1、如果Producer为空,则先将n加到requested上面去,requested的大小是Subscribe需要处理的事件数
2、调用事件生产者的request方法,同时将n传递给Producer,告知它本Subscriber准备消费的事件数
addToRequested()@Subscriber.class

[java] view
plain copy







private void addToRequested(long n) {

if (requested == NOT_SET) {

requested = n;

} else {

final long total = requested + n;

if (total < 0) {

requested = Long.MAX_VALUE; //note1

} else {

requested = total;

}

}

}

1、如果requested+n大于Long.MAX_VALUE则表明,当前该Subscriber接收的任务数不受任何限定,由Producer自行决定向Subscribe提交多少事件
addToRequested()方法只会被request()调用,客户代码不可调用
setProducer()@Subscriber.class

[java] view
plain copy







public void setProducer(Producer p) {

long toRequest;

boolean passToSubscriber = false;

synchronized (this) {

toRequest = requested; //note1

producer = p;

if (subscriber != null) {//一般情况下该结果为假

if (toRequest == NOT_SET) {

passToSubscriber = true;

}

}

}

if (passToSubscriber) {//一般情况该行结果为假

subscriber.setProducer(producer);

} else {

if (toRequest == NOT_SET) { //note2

producer.request(Long.MAX_VALUE);

} else {

producer.request(toRequest);

}

}

1、得到当前Subscriber接收的最大任务数
2、如果之前没有调用过Subscriber的request方法,即没有对Subscriber对象的最大任务请求数做过设置,则该Subscriber默认接收所有来自Producer的事件


总结:

Java中的map、flatMap、observeOn、subscribeOn、filter方法底层实现都是使用lift、operator机制进行实现的。lift实现Observable<T>向Observable<R>的转变,operator实现Subscribe<R>向Subscribe<T>的转变。其中flatMap比较特殊使用了一次map和一次lift&operator。它们之间的区别主要是Operator的区别,observeOn对应OperatorObserveOn
、subscribeOn对应OperatorSubscribeOn 、flatMap对应OperatorMerge 、map对应OperatorMap、 filter对应OperatorFilter。
之前对lift和Operator的讲解很多同学可能也不是很理解,在此我们特意将这两个知识点单独拿出来捋一捋:
首先假设调用lift方法的对象为observable<T>。lift方法返回一个observable<R>对象,observable<R>对象的OnSubscribe.call方法执行如下的代码

利用operator将subscriber<R>转成subscriber<T> [或者将subscriber_inner<T>转成subscriber<T> ], Operator在此起到了连接subscriber<T>和subscriber<R>的作用,可以对来自subscriber<T>的t数据进行处理之后再传递给subscriber<R>
调用subscriber<T>对象的onstart方法,
调用observable<T>.onsubscribe.call方法,

call方法内部会调用subscriber<T>的onNext 、onCompleted方法。onNext方法内部先使用Operator进行一定的预处理,之后根据预处理的结果执行Subscribe<R>.onNext方法
以map为例:subscribe<T>.onnext(T t) 内部执行过程是{ subscribe<R>.onnext(Fun1.call(t))
}
以observeOn为例:subscribe<T>.onnext(T t) 内部执行过程是{queue.add(t); executor.submit(new Runnable(){ call() });
} 上面的call方法内容如下{ t=queue.get(); subscribe_inner<T>.onnext(t);}

番外篇1:对于RxJava和Volley如何使用呢?

难点在于:



volley只支持异步请求,何通过volley如何将处理的结果返回给RxJava?

解决方案:

Volley有一个类public class RequestFuture<T> implements Future<T>, Response.Listener<T>, Response.ErrorListener{}利用该类的get方法可以获得返回值。
RequestFuture工作原理:

构造RequestFuture:RequestFuture<JSONObject> future = RequestFuture.newFuture();
构造volley请求的时候,将上面的对象传入请求中;JSONObjectRequest(Request.Method.GET, Url,future,future,....);
随后调用RequestFuture.get方法,该方法会在当前线程阻塞;阻塞时间可以自己get的时候设置;return
future.get();
get方法内部调用wait(time),在等待时间内还没有结果这抛出超时异常;(wait(0)是无限期等待)
在get的wait过程中,如果volley请求到来了,则会调用RequestFuture的onResponse方法,设置该对象中的private T mResult;,同时调用notifyall(),唤醒等待中的线程。
注意,因为该方法是会阻塞的,因此千万不要在UI线程中调用get方法!!因此需要在一个新线程中进行阻塞,这样RXJava的优势就出来了~~~!撒花~

reference:http://stackoverflow.com/questions/32701331/rxjava-and-volley-requests

番外篇2:RxJava如何实现RxBus?

虽然博客已经很长了,但也只能算是对RxJava的简单了解。这篇文章http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/使用RxJava实现了跟EventBus和Otto同样的事件分发功能。下面对其实现方式进行简单分析

简单使用:

RxBus _rxBus = new RxBus(); //获取RxBus对象,建议将RxBus对象的获取写成一个单例模式
_rxBus.toObserverable()//订阅事件代码, xx中定义事件处理方法

.subscribe(new Action1<Object>() {
@Override public void call(Object event) {
if(event instanceof ClassA) { .......}
}
});

_rxBus.send(new TapEvent()); //发送事件

源码分析:

以下是摘录自开源项目https://github.com/evanman/RxJava-Android-Samples中的一种RxBus实现。个人觉得应该将单例模式嵌套在RxBus类中,如EventBus那样使用getDefault方法获取一个单例,否则用户还需要自己重新写一个单例出来。

RxBus.class

[java] view
plain copy







import rx.Observable;

import rx.subjects.PublishSubject;

import rx.subjects.SerializedSubject;

import rx.subjects.Subject;

public class RxBus {

private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

public void send(Object o) { _bus.onNext(o); }

public Observable<Object> toObserverable() { return _bus; }

public boolean hasObservers() { return _bus.hasObservers(); }

}

SerializedSubject是RxJava开源框架中的一个类,具体内容如下:
SerializedSubject.class

[java] view
plain copy







public class SerializedSubject<T, R> extends Subject<T, R> {

private final SerializedObserver<T> observer;

private final Subject<T, R> actual;

public SerializedSubject(final Subject<T, R> actual) {

super(new OnSubscribe<R>() {

@Override

public void call(Subscriber<? super R> child) {

actual.unsafeSubscribe(child);

}

});

this.actual = actual;

this.observer = new SerializedObserver<T>(actual);

}

@Override public void onCompleted() { observer.onCompleted(); }

@Override public void onError(Throwable e) { observer.onError(e); }

@Override public void onNext(T t) { observer.onNext(t); }

@Override public boolean hasObservers() { return actual.hasObservers(); }

}

Subject<T, R>继承自Observable类;SerializedObserver构造器接收一个Subject<T, R>参数,利用该参数初始化SerializedSubject中的SerializedObserver<T>和Subject<T,
R>两个类型的域;RxBus创建一个SerializedObserver对象时,构造器接收的参数是PublishSubject.create(),即PublishSubject对象.
简单分析

对RxBus简单使用中的方法调用可以做出如下映射:

rxBus.toObserverable()==>serializedSubject
rxBus.toObserverable().subscribe(new Subscriber)==>serializedSubject.subscribe(new Subscriber)
rxBus.send(Object o)==>serializedSubject.onNext(o)==>serializedSubject.serializedObserver.onNext(0)
rxBus.hasObservers()==>serializedSubject.hasObservers()==>serializedSubject.subject.hasObservers()

针对第二步,实际会利用SerializedSubject的OnSubscribe域的call方法处理Subscriber<? super R> child,即actual.unsafeSubscribe(child)==>publishSubject.unsafeSubscribe(child);而publishSubject.unsafeSubscribe方法内部又会调用publishSubject的OnSubscribe域的call方法处理,publishSubject的OnSubscribe域实际是一个SubjectSubscriptionManager类型,SubjectSubscriptionManager类型的call方法内部先将当前传入的Subcribe对象转换成一个SubjectObserver对象,随后存入一个集合中。
针对第三步,将事件传给SerializedSubject的SerializedObserver域的onNext方法处理;而该方法内部会先将事件存入一个队列中,随后将队列中的数据交给publishSubject的onNext方法处理,publishSubject的onNext方法内部会将参数交给第二步的集合中的所有SubjectObserver对象的onNext方法处理。

与EventBus的对比

在第三步中,publishSubject的onNext方法将事件交给所有的监听者进行处理,因此事件处理方法中常常使用 if(obj instansof XX) 的语句判断当前接收到的obj是否是自己想要的事件。而EventBus会根据接收事件的类型交给对应的监听者进行处理,代码中不需要使用
if(obj instansof XX) 的语句,事件发送精准度提高;但是这是利用反射实现的,这对性能有一定的影响。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: