从源码出发了解RxJava的使用(上)
2016-07-30 15:05
537 查看
序言
认识RxJava
普通用法-1
普通用法-2
在这里我给出之前学习RxJava写得非常好的一篇文章“给Android开发者的RxJava详解”,正是有了这篇文章,才使我有了保障能对RxJava进行源码解析,非常感谢这篇文章的作者,同时也希望他能写出更多的佳作。
RxJava有四个基本概念:Observable(可观察者,即被观察者)、Observer/Subscriber(观察者)、subscribe(订阅)、事件。Observable和Observer/Subscriber通过subscribe方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer。Android控件的观察者模式是这样的:
Button –> 被观察者,OnClickListener –> 观察者,setOnClickListener –> 订阅,onClick –> 事件
Observable –> 被观察者,Observer、Subscriber –> 观察者,subscribe –> 订阅,onNext、onCompleted、onError –> 事件
下面将进入正题,我就不会再重复介绍RxJava中的一些方法了(因为我之前学习过的那篇文章已经做了很好的介绍),而是从源码的角度解析它们是怎样实现这个操作的。我首先会给出一个例子,然后运行输出结果,然后再对重要的方法进行代码解析,最后给出总结。
运行结果:
总结:
运行结果:
总结:
由于篇幅太长,就把“从源码出发了解RxJava的使用”分成了上、中、下三篇,中篇会分析RxJava的变换(map、flatMap)和过滤(filter),下篇会分析RxJava的线程控制。其中对flatMap可谓是不留余力的进行分析,欲知详情,请看“从源码出发了解RxJava的使用(中)”。
认识RxJava
普通用法-1
普通用法-2
序言
这是我的第一篇博客,也是我第一次尝试通过阅读源码来了解一个开源框架的使用。阅读源码的过程是艰辛和乏味的,你要通过不断的切换类和方法才能辨别作者的意图,很容易出现上下逻辑断层的情况。经过了两个多星期的努力,终于完成了自己的目标。我研究RxJava源码的目的很简单,只是想知道它是怎么样实现这些操作的,比如它是怎样实现线程的来回切换的。至于它的整体架构是怎样的,我就没有进行了解。在这里我给出之前学习RxJava写得非常好的一篇文章“给Android开发者的RxJava详解”,正是有了这篇文章,才使我有了保障能对RxJava进行源码解析,非常感谢这篇文章的作者,同时也希望他能写出更多的佳作。
认识RxJava
RxJava在GitHub上的自我介绍:“a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.“(一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库)。RxJava的本质就是一个实现异步操作的库,它的优势就是简洁,随着程序逻辑变得越来越复杂,它依然能够保持简洁(网上很多这样的例子,就不列举啦~)。RxJava有四个基本概念:Observable(可观察者,即被观察者)、Observer/Subscriber(观察者)、subscribe(订阅)、事件。Observable和Observer/Subscriber通过subscribe方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer。Android控件的观察者模式是这样的:
button.setOnClickListener(new OnClickListener(){ @Override public void onClick(View v){ } });
Button –> 被观察者,OnClickListener –> 观察者,setOnClickListener –> 订阅,onClick –> 事件
Observable –> 被观察者,Observer、Subscriber –> 观察者,subscribe –> 订阅,onNext、onCompleted、onError –> 事件
下面将进入正题,我就不会再重复介绍RxJava中的一些方法了(因为我之前学习过的那篇文章已经做了很好的介绍),而是从源码的角度解析它们是怎样实现这个操作的。我首先会给出一个例子,然后运行输出结果,然后再对重要的方法进行代码解析,最后给出总结。
普通用法-1
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.i("My", "call"); subscriber.onNext("Hello"); subscriber.onNext("World"); subscriber.onCompleted(); } }); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i("My", "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i("My", "onNext " + s); } }; observable.subscribe(subscriber);
运行结果:
Observable.create(new Observable.OnSubscribe<String>() {
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); } protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
observable.subscribe(subscriber);
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // new Subscriber so onStart it subscriber.onStart(); try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // if an unhandled error occurs executing the onSubscribe we will propagate it subscriber.onError(hook.onSubscribeError(e)); } }
hook.onSubscribeStart(observable, observable.onSubscribe)
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass through by default return onSubscribe; }
return hook.onSubscribeReturn(subscriber);
public <T> Subscription onSubscribeReturn(Subscription subscription) { // pass through by default return subscription; }
总结:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { subscriber.onStart(); observable.onSubscribe.call(subscriber); return subscription; }
普通用法-2
String[] array = new String[]{"Hello", "World"}; // Observable<String> observable = Observable.just(array[0], array[1]); Observable<String> observable = Observable.from(array); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i("My", "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i("My", "onNext " + s); } }; observable.subscribe(subscriber);
运行结果:
Observable.just(array[0], array[1]);
public static <T> Observable<T> just(T t1, T t2) { return from((T[])new Object[] { t1, t2 }); }
Observable.from(array);
public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return create(new OnSubscribeFromArray<T>(array)); }
return just(array[0]);
public static <T> Observable<T> just(final T value) { return ScalarSynchronousObservable.create(value); }
new OnSubscribeFromArray<T>(array)
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> { final T[] array; public OnSubscribeFromArray(T[] array) { this.array = array; } @Override public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); } static final class FromArrayProducer<T> extends AtomicLong implements Producer { final Subscriber<? super T> child; final T[] array; int index; public FromArrayProducer(Subscriber<? super T> child, T[] array) { this.child = child; this.array = array; } @Override public void request(long n) { if (n == Long.MAX_VALUE) fastPath(); else if (n != 0) slowPath(n); } void fastPath() { final Subscriber<? super T> child = this.child; for (T t : array) child.onNext(t); child.onCompleted(); } void slowPath(long r) { final Subscriber<? super T> child = this.child; final T[] array = this.array; final int n = array.length; int i = index; while (r != 0L && i != n) { child.onNext(array[i]); i++; if (i == n) { child.onCompleted(); return; } } } } }
child.setProducer(new FromArrayProducer<T>(child, array));
public void setProducer(Producer p) { long toRequest; synchronized (this) { toRequest = requested; producer = p; } // we execute the request with whatever has been requested (or Long.MAX_VALUE) if (toRequest == NOT_SET) producer.request(Long.MAX_VALUE); else producer.request(toRequest); }
总结:
1. just(T...)内部调用from(T[]) 2. public static <T> Observable<T> from(T[] array) { return create(new OnSubscribeFromArray<T>(array)); } 1)OnSubscribeFromArray对象的call方法 public void call(Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); } 2)Subscriber对象的setProducer方法 public void setProducer(Producer p) { producer.request(Long.MAX_VALUE); } 3)FromArrayProducer对象的request方法 public void request(long n) { final Subscriber<? super T> child = this.child; for (T t : array) child.onNext(t); child.onCompleted(); }
由于篇幅太长,就把“从源码出发了解RxJava的使用”分成了上、中、下三篇,中篇会分析RxJava的变换(map、flatMap)和过滤(filter),下篇会分析RxJava的线程控制。其中对flatMap可谓是不留余力的进行分析,欲知详情,请看“从源码出发了解RxJava的使用(中)”。
相关文章推荐
- PHP 开源框架22个简单简介
- RxJava两步打造华丽的Android引导页
- JavaScript跨平台的开源框架NativeScript
- Android中通过RxJava进行响应式程序设计的入门指南
- Java扩展库RxJava的基本结构与适用场景小结
- Java的RxJava库操作符的用法及实例讲解
- 浅析RxJava处理复杂表单验证问题的方法
- Python六大开源框架对比
- 分享15个最受欢迎的Python开源框架
- 基于RxJava实现酷炫启动页
- RxJava入门指南及其在Android开发中的使用示例
- 六款值得推荐的android(安卓)开源框架简介
- 2月27日FreeEast每日构建版、FreeEast稳定版本、FreeEastWeb同时更新
- 掌握主流框架的源码实现,理解框架背后的思想(续) 3ff0
- 跟我一起看Retrofit 2.0的源码
- RxJava之subscribeOn解惑
- elasticsearch2.3安装以及集群部署
- jeecg最新版本的环境搭建详解
- 关于网络请求的开源框架总结 (转)