您的位置:首页 > 编程语言 > Java开发

从源码出发了解RxJava的使用(上)

2016-07-30 15:05 537 查看
序言

认识RxJava

普通用法-1

普通用法-2

序言

  这是我的第一篇博客,也是我第一次尝试通过阅读源码来了解一个开源框架的使用。阅读源码的过程是艰辛和乏味的,你要通过不断的切换类和方法才能辨别作者的意图,很容易出现上下逻辑断层的情况。经过了两个多星期的努力,终于完成了自己的目标。我研究RxJava源码的目的很简单,只是想知道它是怎么样实现这些操作的,比如它是怎样实现线程的来回切换的。至于它的整体架构是怎样的,我就没有进行了解。

  在这里我给出之前学习RxJava写得非常好的一篇文章“给Android开发者的RxJava详解”,正是有了这篇文章,才使我有了保障能对RxJava进行源码解析,非常感谢这篇文章的作者,同时也希望他能写出更多的佳作。

认识RxJava

  RxJava在GitHub上的自我介绍:“a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.“(一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库)。RxJava的本质就是一个实现异步操作的库,它的优势就是简洁,随着程序逻辑变得越来越复杂,它依然能够保持简洁(网上很多这样的例子,就不列举啦~)。

  RxJava有四个基本概念:Observable(可观察者,即被观察者)、Observer/Subscriber(观察者)、subscribe(订阅)、事件。Observable和Observer/Subscriber通过subscribe方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer。Android控件的观察者模式是这样的:

button.setOnClickListener(new OnClickListener(){
@Override
public void onClick(View v){

}
});


Button –> 被观察者,OnClickListener –> 观察者,setOnClickListener –> 订阅,onClick –> 事件

Observable –> 被观察者,Observer、Subscriber –> 观察者,subscribe –> 订阅,onNext、onCompleted、onError –> 事件

  下面将进入正题,我就不会再重复介绍RxJava中的一些方法了(因为我之前学习过的那篇文章已经做了很好的介绍),而是从源码的角度解析它们是怎样实现这个操作的。我首先会给出一个例子,然后运行输出结果,然后再对重要的方法进行代码解析,最后给出总结。

普通用法-1

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.i("My", "call");
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("My", "onCompleted");
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(String s) {
Log.i("My", "onNext " + s);
}
};
observable.subscribe(subscriber);


  运行结果:

  


Observable.create(new Observable.OnSubscribe<String>() {


public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}


observable.subscribe(subscriber);


public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
subscriber.onError(hook.onSubscribeError(e));
}
}


hook.onSubscribeStart(observable, observable.onSubscribe)


public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}


return hook.onSubscribeReturn(subscriber);


public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass through by default
return subscription;
}


  总结:

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
observable.onSubscribe.call(subscriber);
return subscription;
}


普通用法-2

String[] array = new String[]{"Hello", "World"};
// Observable<String> observable = Observable.just(array[0], array[1]);
Observable<String> observable = Observable.from(array);
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("My", "onCompleted");
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(String s) {
Log.i("My", "onNext " + s);
}
};
observable.subscribe(subscriber);


  运行结果:

  


Observable.just(array[0], array[1]);


public static <T> Observable<T> just(T t1, T t2) {
return from((T[])new Object[] { t1, t2 });
}


Observable.from(array);


public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}


return just(array[0]);


public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}


new OnSubscribeFromArray<T>(array)


public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {

final T[] array;

public OnSubscribeFromArray(T[] array) {
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

static final class FromArrayProducer<T> extends AtomicLong implements Producer {

final Subscriber<? super T> child;
final T[] array;
int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n == Long.MAX_VALUE)
fastPath();
else if (n != 0)
slowPath(n);
}

void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array)
child.onNext(t);
child.onCompleted();
}

void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;
int i = index;
while (r != 0L && i != n) {
child.onNext(array[i]);
i++;
if (i == n) {
child.onCompleted();
return;
}
}
}
}
}


child.setProducer(new FromArrayProducer<T>(child, array));


public void setProducer(Producer p) {
long toRequest;
synchronized (this) {
toRequest = requested;
producer = p;
}
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET)
producer.request(Long.MAX_VALUE);
else
producer.request(toRequest);
}


  总结:

1. just(T...)内部调用from(T[])
2. public static <T> Observable<T> from(T[] array) {
return create(new OnSubscribeFromArray<T>(array));
}
1)OnSubscribeFromArray对象的call方法
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
2)Subscriber对象的setProducer方法
public void setProducer(Producer p) {
producer.request(Long.MAX_VALUE);
}
3)FromArrayProducer对象的request方法
public void request(long n) {
final Subscriber<? super T> child = this.child;
for (T t : array)
child.onNext(t);
child.onCompleted();
}


  由于篇幅太长,就把“从源码出发了解RxJava的使用”分成了上、中、下三篇,中篇会分析RxJava的变换(map、flatMap)和过滤(filter),下篇会分析RxJava的线程控制。其中对flatMap可谓是不留余力的进行分析,欲知详情,请看“从源码出发了解RxJava的使用(中)”。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  开源框架 RxJava