您的位置:首页 > 编程语言 > Java开发

RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简

2017-07-20 15:12 567 查看
RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简单使用


RxJava2 封装主要变化

Transformer的变化:RxJava1.X为rx.Observable.Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, RxJava2.X为io.reactivex.ObservableTransformer<Upstream, Downstream>,是一个独立的接口。
Flowable则是FlowableTransformer,如果你使用Flowable,以下ObservableTransformer替换FlowableTransformer即可。
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
简单使用:
//观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//这里调用的方法会在产生事件之后会发送给接收者,接收者对应方法会收到
e.onNext("hahaha");
e.onError(new Exception("wulala"));
e.onComplete();
}}).subscribe(new Observer<String>() {
//接受者,根据事件产生者产生的事件调用不同方法
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}

@Override
public void onNext(String value) {
Log.e(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
我们来用图解一下这其中发生了什么事:Paste_Image.png上游朝下游发送数据,经过subscribe使上下游产生关系,即达成订阅.解析1:ObservableEmitter,这是个啥东西?Emitter:顾名思义,即Rxjava的发射器,通过这个发射器,即可发送事件-----通过调用onNext,onError,onComplete方法发送不同事件.注意:虽然RxJava可以进行事件发送,但这并不意味着你可以随便发送,这其中需要遵循一些规则.onNext:你可以发送无数个onNext,发送的每个onNext接受者都会接收到.onError:当发送了onError事件之后,发送者onError之后的事件依旧会继续发送,但是接收者当接收到onError之后就会停止接收事件了.onComplete:当发送了onComplete事件之后,发送者的onComplete之后的事件依旧会继续发送,但是接收者接收到onComplete之后就停止接收事件了.onError事件和onComplete事件是互斥的,但是这并不代表你配置了多个onError和onComplete一定会崩溃,多个onComplete是可以正常运行的,但是只会接收到第一个,之后的就不会再接收到了,多个onError时,只会接收到第一个,第二个会直接造成程序崩溃.解析2:Disposable又是个啥东西,翻译之后百度告诉我这东西叫做一次性的,是用来控制发送者和接受者之间的纽带的,默认为false,表示发送者和接受者直接的通信阀门关闭,可以正常通信,在调用dispose()方法之后,阀门开启,会阻断发送者和接收者之间的通信,从而断开连接.重载方法:
subscribe();          //表示发送者随意发送数据,接受者什么都不管,什么都不接收.
subscribe(Consumer<? super T> onNext) {}     //只响应onNext()事件,其他的事件忽略.
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}         //含义同上
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}         //含义同上
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}     //含义同上
解析3:默认情况下,发送者和接收者都运行在主线程,但是这显然是不符合实际需求的,我们在日常使用中,通常用的最多的就是在子线程进行各种耗时操作,然后发送到主线程进行,难道我们就没有办法继续用这个优秀的库了?想多了你,一个优秀的库如果连这都想不到,怎么能被称为优秀呢,RxJava中有线程调度器,通过线程调度器,我们可以很简单的实现这种效果,下面放代码.
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hahaha");
e.onNext("hahaha");
e.onNext("hahaha");
Log.e(TAG,"运行在什么线程" + Thread.currentThread().getName());
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())               //线程调度器,将发送者运行在子线程
.observeOn(AndroidSchedulers.mainThread())          //接受者运行在主线程
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
Log.e(TAG,"接收在什么线程" + Thread.currentThread().getName());
}

@Override
public void onNext(String value) {
Log.e(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
注意事项: 变换线程方法与1.x一致subscribeOn(),只有在第一次调用的时候生效,之后不管调用多少次,只会以第一次为准.observeOn(),可以被调用多次,每次调用都会更改线程.RxJava线程池中的几个线程选项
- Schedulers.io()      io操作的线程, 通常io操作,如文件读写.
- Schedulers.computation()      计算线程,适合高计算,数据量高的操作.
- Schedulers.newThread()      创建一个新线程,适合子线程操作.
- AndroidSchedulers.mainThread()      Android的主线程,主线程


操作符之变换


Map:
首先是变换操作符- > Map,(此处引入以前看过的一篇文章的一句话:不知道Map已经统治世界了么?)那么在RxJava2中,Map究竟是个什么鬼.map是RxJava中最简单的一个变换操作符,它的作用是将上游发送过来的事件都去应用一个函数,让每一个事件都按照该函数去变化,下游接收到事件时,就变成了变化过后的事件,多说无益,上代码.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "我是变换过后的" + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("XYK",s);
}
});
}
通过运行结果可以看到,我们在上游发送的数据类型为Integer,到了下游接收到的数据为String类型,中间通过map对其进行了转换,是不是感觉很强大?通过map我们可以将上游数据转化为任意类型发送到下游,就是这么6~圆形事件1,2,3,经过Map转化之后,变成了三角形事件1,2,3,但是有童鞋要问了,这有什么用呢,我们来举一个实际需求的例子,用Map来做一下:
读取一篇英文文章,将文章中的字符全部转换为大写.
我们先来用非RxJava2来做一下:
//模拟一篇文章
String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA";
char[] chars = article.toCharArray();

StringBuffer sb = new StringBuffer();
for (int i = 0; i < chars.length; i++) {
Log.e(TAG,chars[i] + "");
if(chars[i] >= 'a' && chars[i] <= 'z'){
sb.append((chars[i] + "").toUpperCase());
}else{
sb.append(chars[i]);
}
}

Log.e(TAG,sb.toString());
好像看上去没什么问题,但是这逼格显然不够高,程序员的精髓不就是要敲出一段逼格超高的代码么,我们试试用RxJava2:
//模拟一篇文章
String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA";

final char[] chars = article.toCharArray();
Observable.create(new ObservableOnSubscribe<Character>() {
@Override
public void subscribe(ObservableEmitter<Character> e) throws Exception {
for (int i = 0; i < chars.length; i++) {
e.onNext(chars[i]);
}
}
//delay  延时5秒发送
}).delay(5, TimeUnit.SECONDS)
//事件类型转换
.map(new Function<Character, String>() {
@Override
public String apply(Character s) throws Exception {
if (s >= 'a' && s <= 'z') {
return s.toString().toUpperCase();
} else {
return s.toString();
}
}
})
//线程调度
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
瞬间逼格就上去了,而且还做了线程调度等操作,是不是心头顿时感觉一串666飘过,这仅仅是最基础的转化操作符,接下来我们在看一个FlatMap:

FlatMap
FlatMap,上来就看到map,这个操作符和刚才的map有什么区别呢,flatmap可以将上游发送过来的数据,变换为多个数据,然后合并为一个事件发送到下游,这么说是不是有点难懂?恩,还是直接上代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
list.add("我是变换过的" + integer);
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("XYK", s);
}
});
}
通过运行结果可以看到,上游发送的数据在到达flatmap的时候,经过处理,将每个事件变成了5个,而后将5个合并为1个事件发送到下游,并且我们可以注意到,发送到下游的数据是无序的,那么这时候就要说了,我要接收的事件是有序的怎么办,这就是接下来要说的concatMap.

ConcatMap:
ConcatMap和FlatMap一样,只不过一个是有序,一个是无序而已,我们直接把上边的代码做一个更改:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
list.add("我是变换过的" + integer);
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("XYK", s);
}
});
}


过滤操作符


Filter
Filter,顾名思义,过滤器,可以过滤掉一部分不符合要求的事件,当上游给我们发送的数据超多,而下游需要的只是一些特定的数据,如果全部接收上游发送的数据,很容易造成OOM,为了避免OOM的出现,我们则需要对上游数据进行过滤,具体操作如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10000; i++) {
e.onNext(i);
}
}
}).observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 7 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("XYK",integer + "");
}
});
}
在上面的代码中,我们朝下游发送了10000个数据,而我只需要其中可以被7整除的数据,利用filter,将其他的数据过滤出去,留下需要的数据.Filter方法使我们经常用到的一个过滤方法,基本已经可以满足大部分应用场所了,最常见的是过滤一些null对象,但是除此之外,还有一些其他的过滤方法,我们也来看下.

Sample
Sample,样品,其功能也是,sample会每隔一段时间对上游数据进行取样,发送到下游,但是这样会导致丢失了大量事件,比较适合特定场合,如对一组数中进行抽样,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).sample(1,TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("XYK",integer + "");
}
});
}
在上边的代码中,使用sample之后,每隔1秒对上游数据采样一次,发送到下游,其他事件则被过滤.
take/takeList
take和takeList方法可以将上游事件中的前N项或者最后N项发送到下游,其他事件则进行过滤,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).take(3)
//.takeList(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("XYK",integer + "");
}
});
}

distinct
distinct方法,可以将重复对象去除重复对象,这里我们要用到一个方法,repeat(),产生重复事件,这里重复事件,再去除有些多余,只作为一个例子来展示.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0;i < 50; i++) {
e.onNext(i);
}
}
}).take(3)
//生成重复事件
.repeat(3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("XYK",integer + "");
}
});


组合操作符

zip操作符:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
}
});Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("这是");
e.onNext("这个是");
e.onNext("这个则是");
}
});Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("个");
e.onNext("只");
e.onNext("条");
e.onNext("张");
e.onNext("本");
e.onNext("副");
}
});Observable.zip(observable, observable1, observable2, new Function3<Integer, String, String, String>() {
@Override
public String apply(Integer integer, String s, String s2) throws Exception {
return s + integer + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("XYK",s);
}
});
运行结果:Paste_Image.png我们可以看到,3条上游中分别有4个事件,3个事件,6个事件,经过zip操作符操作之后为什么就只变成了3个事件了呢?我们来打下Log,看看其他事件去哪了.添加Log之后的运行结果:Paste_Image.png根据运行结果可以看到,上游逐条发送到下游,下游在接收到最后一条上游发送过来的事件之后开始组合,而多余的数据也被发送了,但是并没有被进行组合,这样是不是就看明白了呢?但是这时候有问题了,组合完成之后,多余的数据依旧在发送,如果我们不停发呢?会产生什么后果?,我们来修改一下observable的代码:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
Log.e("XYK",i + "");
e.onNext(i);
}
}
});
运行之后通过Monitors我们查看一下内存:(Sorry,这里没截下图来....可以自己试一下,不过想一下也知道,内存肯定会暴增嘛...)在内存持续暴增的情况下,可能用不了多久就会OOM,这种情况下我们应该怎么办呢?还记不记得上篇文章写了啥,过滤啊,我们把不需要的过滤掉不就好了
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 100 == 0;
}
});
当然,除此之外还有很多办法,可以根据实际情况来进行组合应用.RxJava中的zip操作符作用是将多条上游发送的事件进行结合到一起,发送到下游,并且按照顺序来进行结合,如多条上游中发送的事件数量不一致,则以最少的那条中的事件为准,下游接收到的事件数量和其相等.


Rxjava的2.x与1.x的区别(官翻)


Nulls

RxJava 2x 不再支持
null
值,如果传入一个
null
会抛出
NullPointerException

Observable.just(null);Single.just(null);Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
这意味着
Observable<Void>
不再发射任何值,而只是正常结束或者抛出异常。API 设计者可以定义
Observable<Object>
这样的观察者,
因为并不确定具体是什么类型的
Object
。例如,如果你需要一个 signaller-like ,你可以定义一个共享的枚举类型,它是一个单独的实例
onNext
‘d:
enum Irrelevant { INSTANCE; }Observable<Object> source = Observable.create((ObservableEmitter<Object> emitter) -> {
System.out.println("Side-effect 1");
emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 2");
emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 3");
emitter.onNext(Irrelevant.INSTANCE);
});source.subscribe(e -> { /* Ignored. */ }, Throwable::printStackTrace);


Observable 和 Flowable

在RxJava 0.x中关于介绍backpressure部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用了
Observable
。 主要的背压问题是有很多很火的代码,像UI events,不能合理的背压,导致了无法意料的
MissingBackpressureException
。我们试图在 2.x 中纠正这个问题。因此我们把
io.reactivex.Observable
设计成非背压的,并增加一个新的
io.reactivex.Flowable
去支持背压。好消息是操作符的名字几乎没有改动。坏消息是当你执行’organize imports’时必须要格外的小心,它可能无意的给你选择一个非背压的
io.reactivex.Observable


Single

2.x 的
Single
类可以发射一个单独
onSuccess
onError
消息。它现在按照Reactive-Streams规范被重新设计,
SingleObserver
改成了如下的接口。
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
并遵循协议
onSubscribe (onSuccess | onError)?
.


Completable

Completable
大部分和以前的一样。因为它在1.x的时候就是按照Reactive-Streams的规范进行设计的。命名上有些变化,
rx.Completable.CompletableSubscriber
变成了
io.reactivex.CompletableObserver
onSubscribe(Disposable)
:
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
并且仍然遵循协议
onSubscribe (onComplete | onError)?
.


Maybe

RxJava 2.0.0-RC2 介绍了一个新的类型
Maybe
。从概念上来说,它是
Single
Completable
的结合体。它可以发射0个或1个通知或错误的信号。
Maybe
类结合了
MaybeSource
,
MaybeObserver<T>
作为信号接收接口,同样遵循协议
onSubscribe
(onSuccess | onError | onComplete)?
。因为最多有一个元素被发射,
Maybe
没有背压的概念。这意味着调用
onSubscribe(Disposable)
请求可能还会触发其他
onXXX
方法。和
Flowable
不同,如果那有一个单独的值要发射,那么只有
onSuccess
被调用,
onComplete
不被调用。这个新的类,实际上和其他
Flowable
的子类操作符一样可以发射0个或1个序列。
Maybe.just(1)
.map(v -> v + 1)
.filter(v -> v == 1)
.defaultIfEmpty(2)
.test()
.assertResult(2);


Base reactive interfaces


基础reactive接口

按照Reactive-Streams风格的
Flowable
实现了
Publisher<T>
接口,其他基础类也实现了类似的基础接口
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}interface CompletableSource {
void subscribe(CompletableObserver observer);
}interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
因此,很多操作符需要从用户接收
Publisher
XSource
的一些基础的类型。
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
通过
Publisher
作为输入,你可以组合其他的遵从Reactive-Streams规范的库,而不需要包裹或把它们转换成
Flowable
。如果一个操作符必须要提供一个基础类,那么用户将会收到一个完整的基础类。
Flowable<Flowable<Integer>> windows = source.window(5);source.compose((Flowable<T> flowable) ->
flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()));


Subjects 和 Processors

在Reactive-Streams规范中,
Subject
类似于行为,即消费者和提供者的事件在同一时间发生。随着
Observable
/
Flowable
的分离,支持背压的类都是遵从Reactive-Streams规范的
FlowableProcessor<T>
的子类。一个关于
Subject
重要的变化是它们不再支持
T
-> R
这样的转换。在 2.x 中,
io.reactivex.subjects.AsyncSubject
,
io.reactivex.subjects.BehaviorSubject
,
io.reactivex.subjects.PublishSubject
,
io.reactivex.subjects.ReplaySubject
io.reactivex.subjects.UnicastSubject
不支持背压。
io.reactivex.processors.AsyncProcessor
,
io.reactivex.processors.BehaviorProcessor
,
io.reactivex.processors.PublishProcessor
,
io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor
支持背压。
BehaviorProcessor
PublishProcessor
不能协同请求下级的订阅者,如果下游不能保存,则会发射一个
MissingBackpressureException
异常。其他
XProcessor
类支持对下游订阅者背压,但是当被订阅源时,它们会无限制的消费。


其他类

rx.observables.ConnectableObservable
现在是
io.reactivex.observables.ConnectableObservable<T>
io.reactivex.flowables.ConnectableFlowable<T>


GroupedObservable

rx.observables.GroupedObservable
现在是
io.reactivex.observables.GroupedObservable<T>
io.reactivex.flowables.GroupedFlowable<T>
.在1.x中,你可以用
GroupedObservable.from()
创建一个实例。在2.x中,所有实例都直接继承了
GroupedObservable
,因此这个工厂方法不再可用;
现在整个类都是抽象的。不过你可以继承类然后添加你自定义的
subscribeActual
行为来达到1.x中相似的功能。
class MyGroup<K, V> extends GroupedObservable<K, V> {
final K key;final Subject<V> subject;public MyGroup(K key) {
this.key = key;
this.subject = PublishSubject.create();
}@Override
public T getKey() {
return key;
}@Override
protected void subscribeActual(Observer<? super T> observer) {
subject.subscribe(observer);
}
}


功能接口

1.x 和 2.x 是跑在Java 6以上的虚拟机的,所以我们不能使用Java8的功能接口(functional interfaces),比如
Java.util.function.Function
。但我们可以按照这个例子来定义自己的功能接口(functional
interfaces)。一个值得注意的区别是所有的功能接口(functional interfaces)都定义了
throws Exception
。这对于consumers 和 mappers 来说是一个巨大的便利,你不需要用
try-catch
捕获异常。
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
如果文件不存在或者不可读,结尾的consumer会直接输出
IOException
。你可以直接调用
Files.readLines(name)
而不需要捕获异常。


Actions

为了减少组件数量,2.x中没有定义
Action3
-
Action9
ActionN
。保留的action接口按照Java 8 functional风格命名。 无参数的
Action0
被操作符
io.reactivex.functions.Action
Scheduler
代替。
Action1
被重命名为
Consumer
Action2
被重命名为
BiConsumer
ActionN
Consumer<Object[]>
代替。


Functions

我们按照java 8的命名风格定义了
io.reactivex.functions.Function
io.reactivex.functions.BiFunction
,
Func3
-
Func9
分别改成了
Function3
-
Function9
FuncN
Function<Object[],
R>
代替。此外,操作符不再使用
Func1<T, Boolean>
但原始返回类型为
Predicate<T>
io.reactivex.functions.Functions
类提供了常见的转换功能
Function<Object[],
R>


Subscriber

Reactive-Streams规范有自己的Subscriber。这个接口是轻量级的,并且把请求管理和取消机制整合进了一个单独的接口
org.reactivestreams.Subscription
,而不是分别用
rx.Producer
rx.Subscription
。这就可以用比1.x中
rx.Subscriber
更少的内部状态来创建一个stream
consumers。
Flowable.range(1, 10).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}@Override
public void onNext(Integer t) {
System.out.println(t);
}@Override
public void onError(Throwable t) {
t.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
});
由于命名冲突,把
rx
包改成
org.reactivestreams
。此外
org.reactivestreams.Subscriber
不能从外面添加、取消或请求。为了弥补这一空缺,我们为
Flowable
定义了抽象类
DefaultSubscriber
,
ResourceSubscriber
DisposableSubscriber
分别提供了类似于
rx.Subscriber
的资源跟踪支持,并且可以从外面取消
dispose()
:
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}@Override
public void onNext(Integer t) {
System.out.println(t);
&n
33332
bsp;}@Override
public void onError(Throwable t) {
t.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
};Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();
注意,由于Reactive-Streams的兼容性,方法
onCompleted
被重命名为
onComplete
。因为1.x中,
Observable.subscribe(Subscriber)
返回
Subscription
,用户经常添加
Subscription
CompositeSubscription
中,例如:
CompositeSubscription composite = new CompositeSubscription();composite.add(Observable.range(1, 5).subscribe(new TestSubscriber<Integer>()));由于Reactive-Streams规范,Publisher.subscribe
无返回值。为了弥补这一点,我们增加了
E subscribeWith(E subscriber)
方法。因为在2.x中
ResourceSubscriber
直接实现了
Disposable
,所以代码可以这样写。
[/code]
CompositeDisposable composite2 = new CompositeDisposable();composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));


在onSubscribe/onStart中调用request

注意,在
Subscriber.onSubscribe
ResourceSubscriber.onStart
中调用
request(n)
将会立即调用
onNext
,实例代码如下:
Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {@Override
public void onSubscribe(Subscription s) {
System.out.println("OnSubscribe start");
s.request(Long.MAX_VALUE);
System.out.println("OnSubscribe end");
}@Override
public void onNext(Integer v) {
System.out.println(v);
}@Override
public void onError(Throwable e) {
e.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
});
This will print:将会打印:
OnSubscribe start
1
2
3
Done
OnSubscribe end
当你在
onSubscribe
/
onStart
中做了一些初始化的工作,而这些工作是在
request
后面时,会出现一些问题,在
onNext
执行时,你的初始化工作的那部分代码还没有执行。为了避免这种情况,请确保你调用
request
时,已经把所有初始化工作做完了。这个行为不同于1.x中的
request
要经过延迟的逻辑直到上游的
Producer
到达时。在2.x中,总是
Subscription
先传递下来,90%的情况下没有延迟请求的必要。


Subscription

在RxJava 1.x中,接口
rx.Subscription
负责流和资源的生命周期管理,即退订和释放资源,例如scheduled tasks。Reactive-Streams规范用这个名称指定source和consumer之间的关系:
org.reactivestreams.Subscription
允许从上游请求一个正数,并支持取消。为了避免名字冲突,1.x的
rx.Subscription
被改成了
io.reactivex.Disposable
。因为Reactive-Streams的基础接口
org.reactivestreams.Publisher
定义
subscribe()
为无返回值,
Flowable.subscribe(Subscriber)
不再返回任何
Subscription
。其他的基础类型也遵循这种规律。在2.x中其他的
subscribe
的重载方法返回
Disposable
。原始的
Subscription
容器类型已经被重命名和修改。
CompositeSubscription
改成
CompositeDisposable

SerialSubscription
MultipleAssignmentSubscription
被合并到了
SerialDisposable
set()
方法取消了旧值,而
replace()
方法没有。
RefCountSubscription
已被删除。


背压

Reactive-Streams规范的操作符支持背压,特别是当它们不发送请求时,它们不会溢出。新的操作符
Flowable
被设计成适合下游请求,然而这个不意味着
MissingBackpressureException
不会出现。这个异常仍然存在。但这一次,
onNext
会抛出这个异常。作为替代,在2.x中
Observable
完全不支持背压,但可以被替换。


Reactive-Streams compliance

Flowable-based sources和operators是遵从Reactive-Streams 1.0.0规范的,除了一个规则§3.9和解释的规则§1.3:§3.9: While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule.Rule §3.9 requires excessive overhead to handle (half-serializer on every operator dealing with request()) for a bug-case. RxJava 2 (and Reactor 3 in fact) reports the
IllegalArgumentException
to
RxJavaPlugins.onError
and
ignores it otherwise. RxJava 2 passes the Test Compatibility Kit (TCK) by applying a custom
operator that routes the
IllegalArgumentException
into the
Subscriber.onError
in
an async-safe manner. All major Reactive-Streams libraries are free of such zero requests; Reactor 3 ignores it as we do and Akka-Stream uses a converter (to interact with other RS sources and consumers) which has (probably) a similar routing behavior as our
TCK operator.§1.3: onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications).TCK 允许同步但限制
onSubscribe
onNext
之间往返。也就是说在
onSubscribe
中,调用
request(1)
后将会调用
onNext
,在
onNext
返回后
request(1)
才会返回。虽然大部分操作符都是这样的,但操作符
observeOn
会异步的调用
onNext
,因此
onSubscribe
会和
onNext
同时被调用。这就是由TCK来检测,我们使用another
operator来延迟下游请求直到
onSubscribe
返回。再次声明,这种异步行为不是RxJava 2的一个问题,因为在Reactor 3中操作符是线程安全的执行
onSubscribe
。Akka-Stream的转换类似于延迟请求。因为这两个影响inter-library的行为,我们考虑在以后给
Flowable
增加了一个标准的操作符,把这两种行为改到一个单独的方法。


Runtime hooks

2.x 中重新设计了
RxJavaPlugins
类,现在支持运行时改变回调。测试需要重写schedulers,生命周期方法可以通过回调函数。
RxJavaObservableHook
和友类现在都取消了,
RxJavaHooks
功能被加入到了
RxJavaPlugins


Schedulers

在2.x的API中仍然支持主要的默认scheduler:
computation
,
io
,
newThread
trampoline
,可以通过
io.reactivex.schedulers.Schedulers
这个实用的工具类来调度。2.x中不存在
immediate
调度器。 它被频繁的误用,并没有正常的实现
Scheduler
规范;它包含用于延迟动作的阻塞睡眠,并且不支持递归调度。你可以使用
Schedulers.trampoline()
来代替它。
Schedulers.test()
已经被移除,这样避免了默认调度器休息的概念差异。那些返回一个”global”的调度器实例是鉴于
test()
总是返回一个新的
TestScheduler
实例。现在我们鼓励测试人员使用这样简单的代码
new
TestScheduler()
io.reactivex.Scheduler
抽象类现在支持直接调度任务,不需要先创建然后通过
Worker
调度。
public abstract class Scheduler {public Disposable scheduleDirect(Runnable task) { ... }public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay,
long period, TimeUnit unit) { ... }public long now(TimeUnit unit) { ... }// ... rest is the same: lifecycle methods, worker creation
}
主要的目的是为了避免跟踪
Worker
的开销。方法有一个默认的实现,你可以直接复用
createWorker
,但如果有需要,你也可以重写它来实现更强大的功能。这些方法返回了当前时间调度器的概念,
now()
被改成接受一个用于指定单位量的
TimeUnit
的方法。


进入reactive的世界

RxJava 1.x的设计缺陷之一是暴露了
rx.Observable.create()
方法,该方法虽然很强大,但导致了你很少使用内置典型的操作符。不幸的是,有太多的代码依赖于这个库,所以我们不能删除或重命名它。2.x是一个新的开始,我们不会再犯这个错误了。每一个基础类
Flowable
,
Observable
,
Single
,
Maybe
Completable
都有安全的
create
操作符去支持背压和取消。
Flowable.create((FlowableEmitter<Integer> emitter) -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
实际上,1.x中
fromEmitter
已经被重命名为
Flowable.create
。其他基础类型也有类似的
create
方法。


离开reactive的世界

除了subscribing 各自的consumers(
Subscriber
,
Observer
,
SingleObserver
,
MaybeObserver
CompletableObserver
)
以及functional-interface 基础consumers(例如
subscribe(Consumer<T>, Consumer<Throwable>, Action)
),以前在1.x中独立的
BlockingObservable
已经集成了主要的基础类型。现在你可以直接调用
blockingX
来阻塞等待结果:
List<Integer> list = Flowable.range(1, 100).toList().blockingGet(); // toList() returns SingleInteger i = Flowable.range(100, 100).blockingLast();
在2.x中另外一个关于
rx.Subscriber
org.reactivestreams.Subscriber
重要的区别是,你的
Subscriber
Observer
不允许抛出任何致命的异常。这意味着下面这样的代码不再是合法的:
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}public void onNext(Integer t) {
if (t == 1) {
throw new IllegalArgumentException();
}
}public void onError(Throwable e) {
if (e instanceof IllegalArgumentException) {
throw new UnsupportedOperationException();
}
}public void onComplete() {
throw new NoSuchElementException();
}
};Flowable.just(1).subscribe(subscriber);
这样的规则同样适用于
Observer
,
SingleObserver
,
MaybeObserver
CompletableObserver
。由于很多现有基于1.x的代码做了类似的事情,我们设计了
safeSubscribe
方法来帮助你处理这样的代码。当然你也可以使用
subscribe(Consumer<T>, Consumer<Throwable>, Action)
方法来提供一个回调。
Flowable.just(1)
.subscribe(
subscriber::onNext,
subscriber::onError,
subscriber::onComplete,
subscriber::onSubscribe
);


Testing

测试RxJava 2.x和1.x中一样,
Flowable
可以用
io.reactivex.subscribers.TestSubscriber
测试,而非背压的
Observable
,
Single
,
Maybe
Completable
可以用
io.reactivex.observers.TestObserver
测试。


test() “operator”

为了支持我们内部测试,所有的基础类都有
test()
方法,返回
TestSubscriber
TestObserver
:
TestSubscriber<Integer> ts = Flowable.range(1, 5).test();TestObserver<Integer> to = Observable.range(1, 5).test();TestObserver<Integer> tso = Single.just(1).test();TestObserver<Integer> tmo = Maybe.just(1).test();TestObserver<Integer> tco = Completable.complete().test();
第二个便利之处在于,大部分
TestSubscriber
/
TestObserver
方法返回自身实例,这让我们可以链式调用各种
assertX
方法。第三个便利是,你可以流畅的测试你的代码而不需要去创建或者引入
TestSubscriber
/
TestObserver
实例。
Flowable.range(1, 5)
.test()
.assertResult(1, 2, 3, 4, 5)
;


值得注意的新的断言方法

assertResult(T... items)
: 断言在
onComplete
中将会按指定顺序收到给定的值,并且没有错误。
assertFailure(Class<? extends Throwable> clazz, T... items)
: 断言将会收到指定的异常。
assertFailureAndMessage(Class<? extends Throwable> clazz, String message, T... items)
: 和
assertFailure
一样,但还会验证
getMessage()
中包含的值。
awaitDone(long time, TimeUnit unit)
等待一个终结事件,如果超时了,将会取消该事件。
assertOf(Consumer<TestSubscriber<T>> consumer)
组成一些断言到流式链中。其中一个好处是,把
Flowable
改为
Observable
,所以测试代码不需要改变,内部的已经把
TestSubscriber
改成了
TestObserver


提前取消和请求

TestObserver
中的
test()
方法有一个
test(boolean
cancel)
重载,它能在订阅前取消
TestSubscriber
/
TestObserver
:
PublishSubject<Integer> pp = PublishSubject.create();// nobody subscribed yet
assertFalse(pp.hasSubscribers());pp.test(true);// nobody remained subscribed
assertFalse(pp.hasSubscribers());
TestSubscriber
test(long
initialRequest)
test(long initialRequest, boolean cancel)
重载,用于指定初始请求数量以及
TestSubscriber
是否应该立即被取消。如果
initialRequest
被给定,
TestSubscriber
实例通常需要被捕获以便访问
request()
方法:
PublishProcessor<Integer> pp = PublishProcessor.create();TestSubscriber<Integer> ts = pp.test(0L);ts.request(1);pp.onNext(1);
pp.onNext(2);ts.assertFailure(MissingBackpressureException.class, 1);


测试异步代码

对于给定的异步代码,流畅的阻塞终端事件是可能的:
Flowable.just(1)
.subscribeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);


Mockito & TestSubscriber

那些在1.x中正在使用Mockito和
Observer
的用户需要去使用
Subscriber.onSubscribe
方法去提出初始的请求,否则序列化将会挂起或者失败:
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
Subscriber<T> w = mock(Subscriber.class);Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock a) throws Throwable {
Subscription s = a.getArgumentAt(0, Subscription.class);
s.request(Long.MAX_VALUE);
return null;
}
}).when(w).onSubscribe((Subscription)any());return w;}


操作符的差别

2.x中大部分操作符仍然被保留,实际上大部分行为和1.x一样。下面的列表中列出了每一个基础类的在1.x和2.x的区别通常来说,很多操作符提供了重载,允许指定运行上游的内部缓冲区的大小或者预先分配的数量。一些操作符重载已经被重命名为了后缀风格,比如
fromArray
,
fromIterable
。这么做的原因是,当用Java
8编译时,javac往往不能区分功能接口类型。在1.x中被标记为
@Beta
@Experimental
的操作符已经成为正式操作符了。


1.x Observable 到 2.x Flowable


工厂方法:

1.x2.x
amb
添加
amb(ObservableSource...)
重载, 2-9 参数被删除
RxRingBuffer.SIZE
bufferSize()
combineLatest
增加条目重载, 增加 带
bufferSize
参数的重载,
combineLatest(List)
被删除
concat
增加带
prefetch
参数的重载, 5-9 重载被删除 , 使用
concatArray
代替
N/A增加
concatArray
concatArrayDelayError
N/A增加
concatArrayEager
concatArrayEagerDelayError
concatDelayError
增加带延时的重载
concatEagerDelayError
增加带延时的重载
create(SyncOnSubscribe)
generate
+ 重载代替
create(AsnycOnSubscribe)
不存在
create(OnSubscribe)
使用安全的
create(FlowableOnSubscribe, BackpressureStrategy)
, 支持
unsafeCreate()
from
拆分成
fromArray
,
fromIterable
,
fromFuture
N/A增加
fromPublisher
fromAsync
重命名为
create()
N/A增加
intervalRange()
limit
被删除, 使用
take
merge
增加带
prefetch
的重载
mergeDelayError
增加带
prefetch
的重载
sequenceEqual
增加带
bufferSize
的重载
switchOnNext
增加带
prefetch
的重载
switchOnNextDelayError
增加带
prefetch
的重载
timer
被废弃
zip
增加带
bufferSize
delayErrors
的重载,
拆分成了
zipArray
zipIterable


实例方法:

1.x2.x
all
RC3 返回
Single<Boolean>
any
RC3 返回
Single<Boolean>
asObservable
重命名为
hide()
, 隐藏所有的身份
buffer
重载自定义的
Collection
提供者
cache(int)
被废弃
collect
RC3 返回
Single<U>
collect(U, Action2<U, T>)
改成
collectInto
和 RC3 返回
Single<U>
concatMap
增加带
prefetch
的重载
concatMapDelayError
增加带
prefetch
的重载, 支持延时
concatMapEager
增加带
prefetch
的重载
concatMapEagerDelayError
增加带
prefetch
的重载, 支持延时
count
RC3 返回
Single<Long>
countLong
被删除, 使用
count
distinct
重载自定义的
Collection
提供者.
doOnCompleted
重命名为
doOnComplete
doOnUnsubscribe
重命名为
Flowable.doOnCancel
doOnDispose
, additional
info
N/A增加
doOnLifecylce
来处理
onSubscribe
,
request
cancel
elementAt(int)
RC3 不再发射 NoSuchElementException 如果源比索引更小
elementAt(Func1, int)
被删除, 使用
filter(predicate).elementAt(int)
代替
elementAtOrDefault(int, T)
重命名为
elementAt(int, T)
和 RC3 返回
Single<T>
elementAtOrDefault(Func1, int, T)
被删除, 使用
filter(predicate).elementAt(int, T)
代替
first()
RC3 重命名为
firstElement
返回
Maybe<T>
first(Func1)
被删除, 使用
filter(predicate).first()
代替
firstOrDefault(T)
重命名为
first(T)
RC3 返回
Single<T>
firstOrDefault(Func1, T)
被删除, 使用
filter(predicate).first(T)
代替
flatMap
增加带
prefetch
的重载
N/A增加
forEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]])
用于有条件停止 consumption
groupBy
增加带
bufferSize
delayError
的重载,
支持 支持内部自定义map,RC1中没有
ignoreElements
RC3 返回
Completable
isEmpty
RC3 返回
Single<Boolean>
last()
RC3 重命名为
lastElement
返回
Maybe<T>
last(Func1)
被删除, 使用
filter(predicate).last()
代替
lastOrDefault(T)
重命名为
last(T)
RC3 返回
Single<T>
lastOrDefault(Func1, T)
被删除, 使用
filter(predicate).last(T)
代替
nest
被删除, 使用
just
代替
publish(Func1)
增加带
prefetch
的重载
reduce(Func2)
RC3 返回
Maybe<T>
N/A增加
reduceWith(Callable, BiFunction)
为了减少自定义Subscriber, 返回
Single<T>
N/A增加
repeatUntil(BooleanSupplier)
repeatWhen(Func1, Scheduler)
删除了重载, 使用
subscribeOn(Scheduler).repeatWhen(Function)
代替
retry
增加
retry(Predicate)
,
retry(int,
Predicate)
N/A增加
retryUntil(BooleanSupplier)
retryWhen(Func1, Scheduler)
删除了重载, 使用
subscribeOn(Scheduler).retryWhen(Function)
代替
N/A增加
sampleWith(Callable, BiFunction)
去扫描自定义的Subscriber方式
single()
RC3 重命名为
singleElement
返回
Maybe<T>
single(Func1)
被删除,使用
filter(predicate).single()
代替
singleOrDefault(T)
重命名为
single(T)
RC3 返回
Single<T>
singleOrDefault(Func1, T)
被删除,使用
filter(predicate).single(T)
代替
skipLast
增加带
bufferSize
delayError
的重载
startWith
2-9 参数的被删除了, 使用
startWithArray
代替
N/A增加
startWithArray
来减少二义性
N/A增加
subscribeWith
返回输入的订阅对象
switchMap
增加带
prefetch
的重载
switchMapDelayError
增加带
prefetch
的重载
takeLastBuffer
被删除
N/A增加
test()
timeout(Func0<Observable>, ...)
方法签名改成了
timeout(Publisher, ...)
删除了方法, 如果有需要,使用
defer(Callable<Publisher>>)
toBlocking().y
内联
blockingY()
操作符, 除了
toFuture
toCompletable
RC3 被删除, 使用
ignoreElements
代替
toList
RC3 返回
Single<List<T>>
toMap
RC3 返回
Single<Map<K, V>>
toMultimap
RC3 返回
Single<Map<K, Collection<V>>>
N/A增加
toFuture
N/A增加
toObservable
toSingle
RC3 被删除, 使用
single(T)
代替
toSortedList
RC3 增加
Single<List<T>>
withLatestFrom
5-9 个参数的重载被删除
zipWith
增加带
prefetch
delayErrors
的重载


不同的返回类型

2.x中一些的操作符产生确切的一个值或者一个错误时,返回
Single

操作符旧返回值新返回值备注
all(Predicate)
Observable<Boolean>
Single<Boolean>
如果所有的元素都匹配,则发射true
any(Predicate)
Observable<Boolean>
Single<Boolean>
如果所有的元素都匹配,则发射true
count()
Observable<Long>
Single<Long>
计算序列中元素的数量
elementAt(int)
Observable<T>
Maybe<T>
Emits 给定位置处的元素或完成的元素
elementAt(int, T)
Observable<T>
Single<T>
发射指定位置的元素或默认元素
first(T)
Observable<T>
Single<T>
发射第一个元素或者抛出
NoSuchElementException
firstElement()
Observable<T>
Maybe<T>
发射第一个元素或者结束
ignoreElements()
Observable<T>
Completable
忽略所有非终端事件
isEmpty()
Observable<Boolean>
Single<Boolean>
如果源为空,则发射true
last(T)
Observable<T>
Single<T>
发射最后一个元素或默认值
lastElement()
Observable<T>
Maybe<T>
发射最后一个元素或结束
reduce(BiFunction)
Observable<T>
Maybe<T>
发射减少的值或者结束
reduce(Callable, BiFunction)
Observable<U>
Single<U>
发射减少的值或者初始的值
reduceWith(U, BiFunction)
Observable<U>
Single<U>
发射减少的值或者初始的值
single(T)
Observable<T>
Single<T>
发射唯一的元素或默认值
singleElement()
Observable<T>
Maybe<T>
发射唯一的元素或结束
toList()
Observable<List<T>>
Single<List<T>>
将所有元素放到
List
toMap()
Observable<Map<K, V>>
Single<Map<K, V>>
将所有元素放到
Map
toMultimap()
Observable<Map<K, Collection<V>>>
Single<Map<K, Collection<V>>>
将所有元素包装到Collection后放到
Map
toSortedList()
Observable<List<T>>
Single<List<T>>
将所有元素放到
List
并排序


移除

为了保证最终的2.0API尽可能干净,我们删除了一些候选的方法和组件。
删除时的版本组件备注
RC3
Flowable.toCompletable()
使用
Flowable.ignoreElements()
代替
RC3
Flowable.toSingle()
使用
Flowable.single(T)
代替
RC3
Flowable.toMaybe()
使用
Flowable.singleElement()
代替
RC3
Observable.toCompletable()
使用
Observable.ignoreElements()
代替
RC3
Observable.toSingle()
使用
Observable.single(T)
代替
RC3
Observable.toMaybe()
使用
Observable.singleElement()
代替


其他改变


doOnCancel/doOnDispose/unsubscribeOn

在1.x中,
doOnUnsubscribe
总是执行终端事件,因为
SafeSubscriber
调用了
unsubscribe
。这实际上是没有必要的。Reactive-Streams规范中,一个终端事件到达
Subscriber
,上游的
Subscription
会取消,因此调用
cancel()
是一个空操作。由于同样的原因
unsubscribeOn
也没被在终端路径上调用,但只有实际在链上调用
cancel
时,才会调用
unsubscribeOn
。因此,下面的序列不会被调用
doOnCancel
:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.subscribe(System.out::println);
然而,下面将会调用
take
操作符在传送过程中取消
onNext

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);
如果你需要在终端或者取消时执行清理,考虑使用
using
操作符代替。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐