您的位置:首页 > 编程语言 > Java开发

RxJava2_学习笔记

2018-03-27 15:25 169 查看

RxJava2学习笔记

参考资料:

入门:https://www.jianshu.com/p/d149043d103a

操作符详解:https://www.jianshu.com/p/0cd258eecf60

实战练习:https://www.jianshu.com/p/c935d0860186

gayhub官网:https://github.com/ReactiveX/RxJava

基础概念

什么是RxJava?

要想理解好RxJava,首先要理解清楚其中的几个关键概念。由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。如果不理解观察者模式,不要紧,下面会详细介绍。

Observable:在观察者模式中称为“被观察者”;

Observer:观察者模式中的“观察者”,可接收Observable发送的数据;

subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;

Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

观察者模式

观察者模式的概念很好理解,具体可以解释为:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。

在程序的观察者模式,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。

RxJava中的观者模式

RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。

关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。

**注意:**Observer是个接口,Observable是个类。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。

onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。

onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。

**注意:**onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

讲了这么多,大家会疑惑:这些都跟异步有什么关系?

其实这都是在为异步进行铺垫。当大家理解了观察者模式之后,就会很容易理解RxJava的异步实现方式。让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新。如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。RxJava中默认Observer (观察者)和Observer (观察者)都在同一线程执行任务。本文主要介绍RxJava中的一些基本使用,关于线程调度问题下篇文章再进行介绍。即本文中的所有操作都默认在同一线程进行。

好了,下面我们就开始了解RxJava的一些基本使用。

函数和操作符的用法

Observable(被观察者)创建的几种方式:

just()方式

Observable<String> observable = Observable.just("Hello");


使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。

fromIterable()方式

List<String> list = new ArrayList<String>();
for(int i =0;i<10;i++){
list.add("Hello"+i);
}
Observable<String> observable = Observable.fromIterable((Iterable<String>) list);


使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。

defer()方式

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("hello");
}
});


当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

interval( )方式

Observable<String> observable = Observable.interval(2, TimeUnit.SECONDS);


创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定2秒一次调用onNext()方法。

range( )方式

Observable<Integer> observable = Observable.range(1,20);


创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到20的数。即调用20次nNext()方法,依次传入1-20数字。

timer( )方式

Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);


创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

repeat( )方式

Observable<Integer> observable = Observable.just(123).repeat();


创建一个Observable,该Observable的事件可以重复调用。

简便的函数式

以Consumer为例,我们可以实现简便式的观察者模式:

Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});


其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用。

**注意:**Observable (被观察者)只有在被Observer (观察者)订阅后才能执行其内部的相关逻辑.

操作符

基本使用:https://www.jianshu.com/p/0cd258eecf60 系列文章

实践:https://www.jianshu.com/p/c935d0860186 系列文章

以后用到再过来补一下,不是太熟悉所以不好给出实例。

Scheduler线程调度

Scheduler简介

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

在RxJava 中,Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景。

Scheduler的API

●Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

●Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

●Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

●Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

●Android 还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。 observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。

基本使用

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("发送的数据:", 1+"");
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) /
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("接收到的数据:", "integer:" + integer);
}
});


先就这么多吧,后面实践到了再补上。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息