RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简
2017-07-20 15:12
567 查看
RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简单使用
Transformer的变化:RxJava1.X为rx.Observable.Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, RxJava2.X为io.reactivex.ObservableTransformer<Upstream, Downstream>,是一个独立的接口。
Flowable则是FlowableTransformer,如果你使用Flowable,以下ObservableTransformer替换FlowableTransformer即可。
Map:首先是变换操作符- > Map,(此处引入以前看过的一篇文章的一句话:不知道Map已经统治世界了么?)那么在RxJava2中,Map究竟是个什么鬼.map是RxJava中最简单的一个变换操作符,它的作用是将上游发送过来的事件都去应用一个函数,让每一个事件都按照该函数去变化,下游接收到事件时,就变成了变化过后的事件,多说无益,上代码.
FlatMapFlatMap,上来就看到map,这个操作符和刚才的map有什么区别呢,flatmap可以将上游发送过来的数据,变换为多个数据,然后合并为一个事件发送到下游,这么说是不是有点难懂?恩,还是直接上代码:
ConcatMap:ConcatMap和FlatMap一样,只不过一个是有序,一个是无序而已,我们直接把上边的代码做一个更改:
FilterFilter,顾名思义,过滤器,可以过滤掉一部分不符合要求的事件,当上游给我们发送的数据超多,而下游需要的只是一些特定的数据,如果全部接收上游发送的数据,很容易造成OOM,为了避免OOM的出现,我们则需要对上游数据进行过滤,具体操作如下:
SampleSample,样品,其功能也是,sample会每隔一段时间对上游数据进行取样,发送到下游,但是这样会导致丢失了大量事件,比较适合特定场合,如对一组数中进行抽样,代码如下:
take/takeListtake和takeList方法可以将上游事件中的前N项或者最后N项发送到下游,其他事件则进行过滤,代码如下:
distinctdistinct方法,可以将重复对象去除重复对象,这里我们要用到一个方法,repeat(),产生重复事件,这里重复事件,再去除有些多余,只作为一个例子来展示.
zip操作符:
RxJava 2x 不再支持
因为并不确定具体是什么类型的
在RxJava 0.x中关于介绍backpressure部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用了
2.x 的
RxJava 2.0.0-RC2 介绍了一个新的类型
按照Reactive-Streams风格的
在Reactive-Streams规范中,
现在整个类都是抽象的。不过你可以继承类然后添加你自定义的
1.x 和 2.x 是跑在Java 6以上的虚拟机的,所以我们不能使用Java8的功能接口(functional interfaces),比如
interfaces)。一个值得注意的区别是所有的功能接口(functional interfaces)都定义了
为了减少组件数量,2.x中没有定义
我们按照java 8的命名风格定义了
把
Reactive-Streams规范有自己的Subscriber。这个接口是轻量级的,并且把请求管理和取消机制整合进了一个单独的接口
consumers。
注意,在
在RxJava 1.x中,接口
Reactive-Streams规范的操作符支持背压,特别是当它们不发送请求时,它们不会溢出。新的操作符
Flowable-based sources和operators是遵从Reactive-Streams 1.0.0规范的,除了一个规则§3.9和解释的规则§1.3:§3.9: While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule.Rule §3.9 requires excessive overhead to handle (half-serializer on every operator dealing with request()) for a bug-case. RxJava 2 (and Reactor 3 in fact) reports the
ignores it otherwise. RxJava 2 passes the Test Compatibility Kit (TCK) by applying a custom
operator that routes the
an async-safe manner. All major Reactive-Streams libraries are free of such zero requests; Reactor 3 ignores it as we do and Akka-Stream uses a converter (to interact with other RS sources and consumers) which has (probably) a similar routing behavior as our
TCK operator.§1.3: onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications).TCK 允许同步但限制
operator来延迟下游请求直到
2.x 中重新设计了
在2.x的API中仍然支持主要的默认scheduler:
RxJava 1.x的设计缺陷之一是暴露了
除了subscribing 各自的consumers(
以及functional-interface 基础consumers(例如
测试RxJava 2.x和1.x中一样,
为了支持我们内部测试,所有的基础类都有
对于给定的异步代码,流畅的阻塞终端事件是可能的:
那些在1.x中正在使用Mockito和
2.x中大部分操作符仍然被保留,实际上大部分行为和1.x一样。下面的列表中列出了每一个基础类的在1.x和2.x的区别通常来说,很多操作符提供了重载,允许指定运行上游的内部缓冲区的大小或者预先分配的数量。一些操作符重载已经被重命名为了后缀风格,比如
8编译时,javac往往不能区分功能接口类型。在1.x中被标记为
2.x中一些的操作符产生确切的一个值或者一个错误时,返回
为了保证最终的2.0API尽可能干净,我们删除了一些候选的方法和组件。
在1.x中,
RxJava2 封装主要变化
Transformer的变化:RxJava1.X为rx.Observable.Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, RxJava2.X为io.reactivex.ObservableTransformer<Upstream, Downstream>,是一个独立的接口。Flowable则是FlowableTransformer,如果你使用Flowable,以下ObservableTransformer替换FlowableTransformer即可。
compile 'io.reactivex.rxjava2:rxjava:2.1.0' compile 'io.reactivex.rxjava2:rxandroid:2.0.1'简单使用:
//观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //这里调用的方法会在产生事件之后会发送给接收者,接收者对应方法会收到 e.onNext("hahaha"); e.onError(new Exception("wulala")); e.onComplete(); }}).subscribe(new Observer<String>() { //接受者,根据事件产生者产生的事件调用不同方法 @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: "); } @Override public void onNext(String value) { Log.e(TAG, "onNext: " + value); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: ", e); } @Override public void onComplete() { Log.e(TAG, "onComplete: "); } });我们来用图解一下这其中发生了什么事:Paste_Image.png上游朝下游发送数据,经过subscribe使上下游产生关系,即达成订阅.解析1:ObservableEmitter,这是个啥东西?Emitter:顾名思义,即Rxjava的发射器,通过这个发射器,即可发送事件-----通过调用onNext,onError,onComplete方法发送不同事件.注意:虽然RxJava可以进行事件发送,但这并不意味着你可以随便发送,这其中需要遵循一些规则.onNext:你可以发送无数个onNext,发送的每个onNext接受者都会接收到.onError:当发送了onError事件之后,发送者onError之后的事件依旧会继续发送,但是接收者当接收到onError之后就会停止接收事件了.onComplete:当发送了onComplete事件之后,发送者的onComplete之后的事件依旧会继续发送,但是接收者接收到onComplete之后就停止接收事件了.onError事件和onComplete事件是互斥的,但是这并不代表你配置了多个onError和onComplete一定会崩溃,多个onComplete是可以正常运行的,但是只会接收到第一个,之后的就不会再接收到了,多个onError时,只会接收到第一个,第二个会直接造成程序崩溃.解析2:Disposable又是个啥东西,翻译之后百度告诉我这东西叫做一次性的,是用来控制发送者和接受者之间的纽带的,默认为false,表示发送者和接受者直接的通信阀门关闭,可以正常通信,在调用dispose()方法之后,阀门开启,会阻断发送者和接收者之间的通信,从而断开连接.重载方法:
subscribe(); //表示发送者随意发送数据,接受者什么都不管,什么都不接收. subscribe(Consumer<? super T> onNext) {} //只响应onNext()事件,其他的事件忽略. subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} //含义同上 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} //含义同上 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} //含义同上解析3:默认情况下,发送者和接收者都运行在主线程,但是这显然是不符合实际需求的,我们在日常使用中,通常用的最多的就是在子线程进行各种耗时操作,然后发送到主线程进行,难道我们就没有办法继续用这个优秀的库了?想多了你,一个优秀的库如果连这都想不到,怎么能被称为优秀呢,RxJava中有线程调度器,通过线程调度器,我们可以很简单的实现这种效果,下面放代码.
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("hahaha"); e.onNext("hahaha"); e.onNext("hahaha"); Log.e(TAG,"运行在什么线程" + Thread.currentThread().getName()); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) //线程调度器,将发送者运行在子线程 .observeOn(AndroidSchedulers.mainThread()) //接受者运行在主线程 .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: "); Log.e(TAG,"接收在什么线程" + Thread.currentThread().getName()); } @Override public void onNext(String value) { Log.e(TAG, "onNext: " + value); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: ", e); } @Override public void onComplete() { Log.e(TAG, "onComplete: "); } });注意事项: 变换线程方法与1.x一致subscribeOn(),只有在第一次调用的时候生效,之后不管调用多少次,只会以第一次为准.observeOn(),可以被调用多次,每次调用都会更改线程.RxJava线程池中的几个线程选项
- Schedulers.io() io操作的线程, 通常io操作,如文件读写. - Schedulers.computation() 计算线程,适合高计算,数据量高的操作. - Schedulers.newThread() 创建一个新线程,适合子线程操作. - AndroidSchedulers.mainThread() Android的主线程,主线程
操作符之变换
Map:首先是变换操作符- > Map,(此处引入以前看过的一篇文章的一句话:不知道Map已经统治世界了么?)那么在RxJava2中,Map究竟是个什么鬼.map是RxJava中最简单的一个变换操作符,它的作用是将上游发送过来的事件都去应用一个函数,让每一个事件都按照该函数去变化,下游接收到事件时,就变成了变化过后的事件,多说无益,上代码.
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "我是变换过后的" + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("XYK",s); } }); }通过运行结果可以看到,我们在上游发送的数据类型为Integer,到了下游接收到的数据为String类型,中间通过map对其进行了转换,是不是感觉很强大?通过map我们可以将上游数据转化为任意类型发送到下游,就是这么6~圆形事件1,2,3,经过Map转化之后,变成了三角形事件1,2,3,但是有童鞋要问了,这有什么用呢,我们来举一个实际需求的例子,用Map来做一下:
读取一篇英文文章,将文章中的字符全部转换为大写.我们先来用非RxJava2来做一下:
//模拟一篇文章 String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA"; char[] chars = article.toCharArray(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < chars.length; i++) { Log.e(TAG,chars[i] + ""); if(chars[i] >= 'a' && chars[i] <= 'z'){ sb.append((chars[i] + "").toUpperCase()); }else{ sb.append(chars[i]); } } Log.e(TAG,sb.toString());好像看上去没什么问题,但是这逼格显然不够高,程序员的精髓不就是要敲出一段逼格超高的代码么,我们试试用RxJava2:
//模拟一篇文章 String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA"; final char[] chars = article.toCharArray(); Observable.create(new ObservableOnSubscribe<Character>() { @Override public void subscribe(ObservableEmitter<Character> e) throws Exception { for (int i = 0; i < chars.length; i++) { e.onNext(chars[i]); } } //delay 延时5秒发送 }).delay(5, TimeUnit.SECONDS) //事件类型转换 .map(new Function<Character, String>() { @Override public String apply(Character s) throws Exception { if (s >= 'a' && s <= 'z') { return s.toString().toUpperCase(); } else { return s.toString(); } } }) //线程调度 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s); } });瞬间逼格就上去了,而且还做了线程调度等操作,是不是心头顿时感觉一串666飘过,这仅仅是最基础的转化操作符,接下来我们在看一个FlatMap:
FlatMapFlatMap,上来就看到map,这个操作符和刚才的map有什么区别呢,flatmap可以将上游发送过来的数据,变换为多个数据,然后合并为一个事件发送到下游,这么说是不是有点难懂?恩,还是直接上代码:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { List<String> list = new ArrayList<String>(); for (int i = 0; i < 5; i++) { list.add("我是变换过的" + integer); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("XYK", s); } }); }通过运行结果可以看到,上游发送的数据在到达flatmap的时候,经过处理,将每个事件变成了5个,而后将5个合并为1个事件发送到下游,并且我们可以注意到,发送到下游的数据是无序的,那么这时候就要说了,我要接收的事件是有序的怎么办,这就是接下来要说的concatMap.
ConcatMap:ConcatMap和FlatMap一样,只不过一个是有序,一个是无序而已,我们直接把上边的代码做一个更改:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { List<String> list = new ArrayList<String>(); for (int i = 0; i < 5; i++) { list.add("我是变换过的" + integer); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("XYK", s); } }); }
过滤操作符
FilterFilter,顾名思义,过滤器,可以过滤掉一部分不符合要求的事件,当上游给我们发送的数据超多,而下游需要的只是一些特定的数据,如果全部接收上游发送的数据,很容易造成OOM,为了避免OOM的出现,我们则需要对上游数据进行过滤,具体操作如下:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; i < 10000; i++) { e.onNext(i); } } }).observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 7 == 0; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e("XYK",integer + ""); } }); }在上面的代码中,我们朝下游发送了10000个数据,而我只需要其中可以被7整除的数据,利用filter,将其他的数据过滤出去,留下需要的数据.Filter方法使我们经常用到的一个过滤方法,基本已经可以满足大部分应用场所了,最常见的是过滤一些null对象,但是除此之外,还有一些其他的过滤方法,我们也来看下.
SampleSample,样品,其功能也是,sample会每隔一段时间对上游数据进行取样,发送到下游,但是这样会导致丢失了大量事件,比较适合特定场合,如对一组数中进行抽样,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).sample(1,TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e("XYK",integer + ""); } }); }在上边的代码中,使用sample之后,每隔1秒对上游数据采样一次,发送到下游,其他事件则被过滤.
take/takeListtake和takeList方法可以将上游事件中的前N项或者最后N项发送到下游,其他事件则进行过滤,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).take(3) //.takeList(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e("XYK",integer + ""); } }); }
distinctdistinct方法,可以将重复对象去除重复对象,这里我们要用到一个方法,repeat(),产生重复事件,这里重复事件,再去除有些多余,只作为一个例子来展示.
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0;i < 50; i++) { e.onNext(i); } } }).take(3) //生成重复事件 .repeat(3) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e("XYK",integer + ""); } });
组合操作符
zip操作符:Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {运行结果:Paste_Image.png我们可以看到,3条上游中分别有4个事件,3个事件,6个事件,经过zip操作符操作之后为什么就只变成了3个事件了呢?我们来打下Log,看看其他事件去哪了.添加Log之后的运行结果:Paste_Image.png根据运行结果可以看到,上游逐条发送到下游,下游在接收到最后一条上游发送过来的事件之后开始组合,而多余的数据也被发送了,但是并没有被进行组合,这样是不是就看明白了呢?但是这时候有问题了,组合完成之后,多余的数据依旧在发送,如果我们不停发呢?会产生什么后果?,我们来修改一下observable的代码:
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
}
});Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("这是");
e.onNext("这个是");
e.onNext("这个则是");
}
});Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("个");
e.onNext("只");
e.onNext("条");
e.onNext("张");
e.onNext("本");
e.onNext("副");
}
});Observable.zip(observable, observable1, observable2, new Function3<Integer, String, String, String>() {
@Override
public String apply(Integer integer, String s, String s2) throws Exception {
return s + integer + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("XYK",s);
}
});
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { Log.e("XYK",i + ""); e.onNext(i); } } });运行之后通过Monitors我们查看一下内存:(Sorry,这里没截下图来....可以自己试一下,不过想一下也知道,内存肯定会暴增嘛...)在内存持续暴增的情况下,可能用不了多久就会OOM,这种情况下我们应该怎么办呢?还记不记得上篇文章写了啥,过滤啊,我们把不需要的过滤掉不就好了
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 100 == 0; } });当然,除此之外还有很多办法,可以根据实际情况来进行组合应用.RxJava中的zip操作符作用是将多条上游发送的事件进行结合到一起,发送到下游,并且按照顺序来进行结合,如多条上游中发送的事件数量不一致,则以最少的那条中的事件为准,下游接收到的事件数量和其相等.
Rxjava的2.x与1.x的区别(官翻)
Nulls
RxJava 2x 不再支持 null值,如果传入一个
null会抛出
NullPointerException
Observable.just(null);Single.just(null);Observable.fromCallable(() -> null)这意味着
.subscribe(System.out::println, Throwable::printStackTrace);Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable<Void>不再发射任何值,而只是正常结束或者抛出异常。API 设计者可以定义
Observable<Object>这样的观察者,
因为并不确定具体是什么类型的
Object。例如,如果你需要一个 signaller-like ,你可以定义一个共享的枚举类型,它是一个单独的实例
onNext‘d:
enum Irrelevant { INSTANCE; }Observable<Object> source = Observable.create((ObservableEmitter<Object> emitter) -> {
System.out.println("Side-effect 1");
emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 2");
emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 3");
emitter.onNext(Irrelevant.INSTANCE);
});source.subscribe(e -> { /* Ignored. */ }, Throwable::printStackTrace);
Observable 和 Flowable
在RxJava 0.x中关于介绍backpressure部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用了Observable。 主要的背压问题是有很多很火的代码,像UI events,不能合理的背压,导致了无法意料的
MissingBackpressureException。我们试图在 2.x 中纠正这个问题。因此我们把
io.reactivex.Observable设计成非背压的,并增加一个新的
io.reactivex.Flowable去支持背压。好消息是操作符的名字几乎没有改动。坏消息是当你执行’organize imports’时必须要格外的小心,它可能无意的给你选择一个非背压的
io.reactivex.Observable。
Single
2.x 的Single类可以发射一个单独
onSuccess或
onError消息。它现在按照Reactive-Streams规范被重新设计,
SingleObserver改成了如下的接口。
interface SingleObserver<T> { void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); }并遵循协议
onSubscribe (onSuccess | onError)?.
Completable
Completable大部分和以前的一样。因为它在1.x的时候就是按照Reactive-Streams的规范进行设计的。命名上有些变化,
rx.Completable.CompletableSubscriber变成了
io.reactivex.CompletableObserver和
onSubscribe(Disposable):
interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); }并且仍然遵循协议
onSubscribe (onComplete | onError)?.
Maybe
RxJava 2.0.0-RC2 介绍了一个新的类型 Maybe。从概念上来说,它是
Single和
Completable的结合体。它可以发射0个或1个通知或错误的信号。
Maybe类结合了
MaybeSource,
MaybeObserver<T>作为信号接收接口,同样遵循协议
onSubscribe (onSuccess | onError | onComplete)?。因为最多有一个元素被发射,
Maybe没有背压的概念。这意味着调用
onSubscribe(Disposable)请求可能还会触发其他
onXXX方法。和
Flowable不同,如果那有一个单独的值要发射,那么只有
onSuccess被调用,
onComplete不被调用。这个新的类,实际上和其他
Flowable的子类操作符一样可以发射0个或1个序列。
Maybe.just(1) .map(v -> v + 1) .filter(v -> v == 1) .defaultIfEmpty(2) .test() .assertResult(2);
Base reactive interfaces
基础reactive接口
按照Reactive-Streams风格的Flowable实现了
Publisher<T>接口,其他基础类也实现了类似的基础接口
interface ObservableSource<T> {因此,很多操作符需要从用户接收
void subscribe(Observer<? super T> observer);
}interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}interface CompletableSource {
void subscribe(CompletableObserver observer);
}interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
Publisher和
XSource的一些基础的类型。
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);通过
Publisher作为输入,你可以组合其他的遵从Reactive-Streams规范的库,而不需要包裹或把它们转换成
Flowable。如果一个操作符必须要提供一个基础类,那么用户将会收到一个完整的基础类。
Flowable<Flowable<Integer>> windows = source.window(5);source.compose((Flowable<T> flowable) ->
flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()));
Subjects 和 Processors
在Reactive-Streams规范中,Subject类似于行为,即消费者和提供者的事件在同一时间发生。随着
Observable/
Flowable的分离,支持背压的类都是遵从Reactive-Streams规范的
FlowableProcessor<T>的子类。一个关于
Subject重要的变化是它们不再支持
T -> R这样的转换。在 2.x 中,
io.reactivex.subjects.AsyncSubject,
io.reactivex.subjects.BehaviorSubject,
io.reactivex.subjects.PublishSubject,
io.reactivex.subjects.ReplaySubject和
io.reactivex.subjects.UnicastSubject不支持背压。
io.reactivex.processors.AsyncProcessor,
io.reactivex.processors.BehaviorProcessor,
io.reactivex.processors.PublishProcessor,
io.reactivex.processors.ReplayProcessor和
io.reactivex.processors.UnicastProcessor支持背压。
BehaviorProcessor和
PublishProcessor不能协同请求下级的订阅者,如果下游不能保存,则会发射一个
MissingBackpressureException异常。其他
XProcessor类支持对下游订阅者背压,但是当被订阅源时,它们会无限制的消费。
其他类
rx.observables.ConnectableObservable现在是
io.reactivex.observables.ConnectableObservable<T>和
io.reactivex.flowables.ConnectableFlowable<T>。
GroupedObservable
rx.observables.GroupedObservable现在是
io.reactivex.observables.GroupedObservable<T>和
io.reactivex.flowables.GroupedFlowable<T>.在1.x中,你可以用
GroupedObservable.from()创建一个实例。在2.x中,所有实例都直接继承了
GroupedObservable,因此这个工厂方法不再可用;
现在整个类都是抽象的。不过你可以继承类然后添加你自定义的
subscribeActual行为来达到1.x中相似的功能。
class MyGroup<K, V> extends GroupedObservable<K, V> {
final K key;final Subject<V> subject;public MyGroup(K key) {
this.key = key;
this.subject = PublishSubject.create();
}@Override
public T getKey() {
return key;
}@Override
protected void subscribeActual(Observer<? super T> observer) {
subject.subscribe(observer);
}
}
功能接口
1.x 和 2.x 是跑在Java 6以上的虚拟机的,所以我们不能使用Java8的功能接口(functional interfaces),比如Java.util.function.Function。但我们可以按照这个例子来定义自己的功能接口(functional
interfaces)。一个值得注意的区别是所有的功能接口(functional interfaces)都定义了
throws Exception。这对于consumers 和 mappers 来说是一个巨大的便利,你不需要用
try-catch捕获异常。
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);如果文件不存在或者不可读,结尾的consumer会直接输出
IOException。你可以直接调用
Files.readLines(name)而不需要捕获异常。
Actions
为了减少组件数量,2.x中没有定义Action3-
Action9和
ActionN。保留的action接口按照Java 8 functional风格命名。 无参数的
Action0被操作符
io.reactivex.functions.Action和
Scheduler代替。
Action1被重命名为
Consumer。
Action2被重命名为
BiConsumer。
ActionN被
Consumer<Object[]>代替。
Functions
我们按照java 8的命名风格定义了io.reactivex.functions.Function和
io.reactivex.functions.BiFunction,
把
Func3-
Func9分别改成了
Function3-
Function9。
FuncN被
Function<Object[], R>代替。此外,操作符不再使用
Func1<T, Boolean>但原始返回类型为
Predicate<T>。
io.reactivex.functions.Functions类提供了常见的转换功能
Function<Object[], R>
Subscriber
Reactive-Streams规范有自己的Subscriber。这个接口是轻量级的,并且把请求管理和取消机制整合进了一个单独的接口org.reactivestreams.Subscription,而不是分别用
rx.Producer和
rx.Subscription。这就可以用比1.x中
rx.Subscriber更少的内部状态来创建一个stream
consumers。
Flowable.range(1, 10).subscribe(new Subscriber<Integer>() {由于命名冲突,把
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}@Override
public void onNext(Integer t) {
System.out.println(t);
}@Override
public void onError(Throwable t) {
t.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
});
rx包改成
org.reactivestreams。此外
org.reactivestreams.Subscriber不能从外面添加、取消或请求。为了弥补这一空缺,我们为
Flowable定义了抽象类
DefaultSubscriber,
ResourceSubscriber和
DisposableSubscriber分别提供了类似于
rx.Subscriber的资源跟踪支持,并且可以从外面取消
dispose():
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {注意,由于Reactive-Streams的兼容性,方法
@Override
public void onStart() {
request(Long.MAX_VALUE);
}@Override
public void onNext(Integer t) {
System.out.println(t);
&n
33332
bsp;}@Override
public void onError(Throwable t) {
t.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
};Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();
onCompleted被重命名为
onComplete。因为1.x中,
Observable.subscribe(Subscriber)返回
Subscription,用户经常添加
Subscription到
CompositeSubscription中,例如:
CompositeSubscription composite = new CompositeSubscription();composite.add(Observable.range(1, 5).subscribe(new TestSubscriber<Integer>()));由于Reactive-Streams规范,Publisher.subscribe
无返回值。为了弥补这一点,我们增加了E subscribeWith(E subscriber)方法。因为在2.x中
ResourceSubscriber直接实现了
Disposable,所以代码可以这样写。[/code]
CompositeDisposable composite2 = new CompositeDisposable();composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));
在onSubscribe/onStart中调用request
注意,在Subscriber.onSubscribe或
ResourceSubscriber.onStart中调用
request(n)将会立即调用
onNext,实例代码如下:
Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {@OverrideThis will print:将会打印:
public void onSubscribe(Subscription s) {
System.out.println("OnSubscribe start");
s.request(Long.MAX_VALUE);
System.out.println("OnSubscribe end");
}@Override
public void onNext(Integer v) {
System.out.println(v);
}@Override
public void onError(Throwable e) {
e.printStackTrace();
}@Override
public void onComplete() {
System.out.println("Done");
}
});
OnSubscribe start 1 2 3 Done OnSubscribe end当你在
onSubscribe/
onStart中做了一些初始化的工作,而这些工作是在
request后面时,会出现一些问题,在
onNext执行时,你的初始化工作的那部分代码还没有执行。为了避免这种情况,请确保你调用
request时,已经把所有初始化工作做完了。这个行为不同于1.x中的
request要经过延迟的逻辑直到上游的
Producer到达时。在2.x中,总是
Subscription先传递下来,90%的情况下没有延迟请求的必要。
Subscription
在RxJava 1.x中,接口rx.Subscription负责流和资源的生命周期管理,即退订和释放资源,例如scheduled tasks。Reactive-Streams规范用这个名称指定source和consumer之间的关系:
org.reactivestreams.Subscription允许从上游请求一个正数,并支持取消。为了避免名字冲突,1.x的
rx.Subscription被改成了
io.reactivex.Disposable。因为Reactive-Streams的基础接口
org.reactivestreams.Publisher定义
subscribe()为无返回值,
Flowable.subscribe(Subscriber)不再返回任何
Subscription。其他的基础类型也遵循这种规律。在2.x中其他的
subscribe的重载方法返回
Disposable。原始的
Subscription容器类型已经被重命名和修改。
CompositeSubscription改成
CompositeDisposable,
SerialSubscription和
MultipleAssignmentSubscription被合并到了
SerialDisposable。
set()方法取消了旧值,而
replace()方法没有。
RefCountSubscription已被删除。
背压
Reactive-Streams规范的操作符支持背压,特别是当它们不发送请求时,它们不会溢出。新的操作符Flowable被设计成适合下游请求,然而这个不意味着
MissingBackpressureException不会出现。这个异常仍然存在。但这一次,
onNext会抛出这个异常。作为替代,在2.x中
Observable完全不支持背压,但可以被替换。
Reactive-Streams compliance
Flowable-based sources和operators是遵从Reactive-Streams 1.0.0规范的,除了一个规则§3.9和解释的规则§1.3:§3.9: While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule.Rule §3.9 requires excessive overhead to handle (half-serializer on every operator dealing with request()) for a bug-case. RxJava 2 (and Reactor 3 in fact) reports the IllegalArgumentExceptionto
RxJavaPlugins.onErrorand
ignores it otherwise. RxJava 2 passes the Test Compatibility Kit (TCK) by applying a custom
operator that routes the
IllegalArgumentExceptioninto the
Subscriber.onErrorin
an async-safe manner. All major Reactive-Streams libraries are free of such zero requests; Reactor 3 ignores it as we do and Akka-Stream uses a converter (to interact with other RS sources and consumers) which has (probably) a similar routing behavior as our
TCK operator.§1.3: onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications).TCK 允许同步但限制
onSubscribe和
onNext之间往返。也就是说在
onSubscribe中,调用
request(1)后将会调用
onNext,在
onNext返回后
request(1)才会返回。虽然大部分操作符都是这样的,但操作符
observeOn会异步的调用
onNext,因此
onSubscribe会和
onNext同时被调用。这就是由TCK来检测,我们使用another
operator来延迟下游请求直到
onSubscribe返回。再次声明,这种异步行为不是RxJava 2的一个问题,因为在Reactor 3中操作符是线程安全的执行
onSubscribe。Akka-Stream的转换类似于延迟请求。因为这两个影响inter-library的行为,我们考虑在以后给
Flowable增加了一个标准的操作符,把这两种行为改到一个单独的方法。
Runtime hooks
2.x 中重新设计了RxJavaPlugins类,现在支持运行时改变回调。测试需要重写schedulers,生命周期方法可以通过回调函数。
RxJavaObservableHook和友类现在都取消了,
RxJavaHooks功能被加入到了
RxJavaPlugins。
Schedulers
在2.x的API中仍然支持主要的默认scheduler: computation,
io,
newThread和
trampoline,可以通过
io.reactivex.schedulers.Schedulers这个实用的工具类来调度。2.x中不存在
immediate调度器。 它被频繁的误用,并没有正常的实现
Scheduler规范;它包含用于延迟动作的阻塞睡眠,并且不支持递归调度。你可以使用
Schedulers.trampoline()来代替它。
Schedulers.test()已经被移除,这样避免了默认调度器休息的概念差异。那些返回一个”global”的调度器实例是鉴于
test()总是返回一个新的
TestScheduler实例。现在我们鼓励测试人员使用这样简单的代码
new TestScheduler()。
io.reactivex.Scheduler抽象类现在支持直接调度任务,不需要先创建然后通过
Worker调度。
public abstract class Scheduler {public Disposable scheduleDirect(Runnable task) { ... }public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay,主要的目的是为了避免跟踪
long period, TimeUnit unit) { ... }public long now(TimeUnit unit) { ... }// ... rest is the same: lifecycle methods, worker creation
}
Worker的开销。方法有一个默认的实现,你可以直接复用
createWorker,但如果有需要,你也可以重写它来实现更强大的功能。这些方法返回了当前时间调度器的概念,
now()被改成接受一个用于指定单位量的
TimeUnit的方法。
进入reactive的世界
RxJava 1.x的设计缺陷之一是暴露了rx.Observable.create()方法,该方法虽然很强大,但导致了你很少使用内置典型的操作符。不幸的是,有太多的代码依赖于这个库,所以我们不能删除或重命名它。2.x是一个新的开始,我们不会再犯这个错误了。每一个基础类
Flowable,
Observable,
Single,
Maybe和
Completable都有安全的
create操作符去支持背压和取消。
Flowable.create((FlowableEmitter<Integer> emitter) -> { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); }, BackpressureStrategy.BUFFER);实际上,1.x中
fromEmitter已经被重命名为
Flowable.create。其他基础类型也有类似的
create方法。
离开reactive的世界
除了subscribing 各自的consumers(Subscriber,
Observer,
SingleObserver,
MaybeObserver和
CompletableObserver)
以及functional-interface 基础consumers(例如
subscribe(Consumer<T>, Consumer<Throwable>, Action)),以前在1.x中独立的
BlockingObservable已经集成了主要的基础类型。现在你可以直接调用
blockingX来阻塞等待结果:
List<Integer> list = Flowable.range(1, 100).toList().blockingGet(); // toList() returns SingleInteger i = Flowable.range(100, 100).blockingLast();在2.x中另外一个关于
rx.Subscriber和
org.reactivestreams.Subscriber重要的区别是,你的
Subscriber和
Observer不允许抛出任何致命的异常。这意味着下面这样的代码不再是合法的:
Subscriber<Integer> subscriber = new Subscriber<Integer>() {这样的规则同样适用于
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}public void onNext(Integer t) {
if (t == 1) {
throw new IllegalArgumentException();
}
}public void onError(Throwable e) {
if (e instanceof IllegalArgumentException) {
throw new UnsupportedOperationException();
}
}public void onComplete() {
throw new NoSuchElementException();
}
};Flowable.just(1).subscribe(subscriber);
Observer,
SingleObserver,
MaybeObserver和
CompletableObserver。由于很多现有基于1.x的代码做了类似的事情,我们设计了
safeSubscribe方法来帮助你处理这样的代码。当然你也可以使用
subscribe(Consumer<T>, Consumer<Throwable>, Action)方法来提供一个回调。
Flowable.just(1) .subscribe( subscriber::onNext, subscriber::onError, subscriber::onComplete, subscriber::onSubscribe );
Testing
测试RxJava 2.x和1.x中一样,Flowable可以用
io.reactivex.subscribers.TestSubscriber测试,而非背压的
Observable,
Single,
Maybe和
Completable可以用
io.reactivex.observers.TestObserver测试。
test() “operator”
为了支持我们内部测试,所有的基础类都有 test()方法,返回
TestSubscriber或
TestObserver:
TestSubscriber<Integer> ts = Flowable.range(1, 5).test();TestObserver<Integer> to = Observable.range(1, 5).test();TestObserver<Integer> tso = Single.just(1).test();TestObserver<Integer> tmo = Maybe.just(1).test();TestObserver<Integer> tco = Completable.complete().test();第二个便利之处在于,大部分
TestSubscriber/
TestObserver方法返回自身实例,这让我们可以链式调用各种
assertX方法。第三个便利是,你可以流畅的测试你的代码而不需要去创建或者引入
TestSubscriber/
TestObserver实例。
Flowable.range(1, 5) .test() .assertResult(1, 2, 3, 4, 5) ;
值得注意的新的断言方法
assertResult(T... items): 断言在
onComplete中将会按指定顺序收到给定的值,并且没有错误。
assertFailure(Class<? extends Throwable> clazz, T... items): 断言将会收到指定的异常。
assertFailureAndMessage(Class<? extends Throwable> clazz, String message, T... items): 和
assertFailure一样,但还会验证
getMessage()中包含的值。
awaitDone(long time, TimeUnit unit)等待一个终结事件,如果超时了,将会取消该事件。
assertOf(Consumer<TestSubscriber<T>> consumer)组成一些断言到流式链中。其中一个好处是,把
Flowable改为
Observable,所以测试代码不需要改变,内部的已经把
TestSubscriber改成了
TestObserver。
提前取消和请求
TestObserver中的
test()方法有一个
test(boolean cancel)重载,它能在订阅前取消
TestSubscriber/
TestObserver:
PublishSubject<Integer> pp = PublishSubject.create();// nobody subscribed yet
assertFalse(pp.hasSubscribers());pp.test(true);// nobody remained subscribed
assertFalse(pp.hasSubscribers());
TestSubscriber有
test(long initialRequest)和
test(long initialRequest, boolean cancel)重载,用于指定初始请求数量以及
TestSubscriber是否应该立即被取消。如果
initialRequest被给定,
TestSubscriber实例通常需要被捕获以便访问
request()方法:
PublishProcessor<Integer> pp = PublishProcessor.create();TestSubscriber<Integer> ts = pp.test(0L);ts.request(1);pp.onNext(1);
pp.onNext(2);ts.assertFailure(MissingBackpressureException.class, 1);
测试异步代码
对于给定的异步代码,流畅的阻塞终端事件是可能的:Flowable.just(1) .subscribeOn(Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1);
Mockito & TestSubscriber
那些在1.x中正在使用Mockito和Observer的用户需要去使用
Subscriber.onSubscribe方法去提出初始的请求,否则序列化将会挂起或者失败:
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
Subscriber<T> w = mock(Subscriber.class);Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock a) throws Throwable {
Subscription s = a.getArgumentAt(0, Subscription.class);
s.request(Long.MAX_VALUE);
return null;
}
}).when(w).onSubscribe((Subscription)any());return w;}
操作符的差别
2.x中大部分操作符仍然被保留,实际上大部分行为和1.x一样。下面的列表中列出了每一个基础类的在1.x和2.x的区别通常来说,很多操作符提供了重载,允许指定运行上游的内部缓冲区的大小或者预先分配的数量。一些操作符重载已经被重命名为了后缀风格,比如 fromArray,
fromIterable。这么做的原因是,当用Java
8编译时,javac往往不能区分功能接口类型。在1.x中被标记为
@Beta或
@Experimental的操作符已经成为正式操作符了。
1.x Observable 到 2.x Flowable
工厂方法:
1.x | 2.x |
---|---|
amb | 添加 amb(ObservableSource...)重载, 2-9 参数被删除 |
RxRingBuffer.SIZE | bufferSize() |
combineLatest | 增加条目重载, 增加 带bufferSize参数的重载, combineLatest(List)被删除 |
concat | 增加带 prefetch参数的重载, 5-9 重载被删除 , 使用 concatArray代替 |
N/A | 增加 concatArray和 concatArrayDelayError |
N/A | 增加 concatArrayEager和 concatArrayEagerDelayError |
concatDelayError | 增加带延时的重载 |
concatEagerDelayError | 增加带延时的重载 |
create(SyncOnSubscribe) | 被 generate+ 重载代替 |
create(AsnycOnSubscribe) | 不存在 |
create(OnSubscribe) | 使用安全的 create(FlowableOnSubscribe, BackpressureStrategy), 支持 unsafeCreate() |
from | 拆分成 fromArray, fromIterable, fromFuture |
N/A | 增加 fromPublisher |
fromAsync | 重命名为 create() |
N/A | 增加 intervalRange() |
limit | 被删除, 使用 take |
merge | 增加带 prefetch的重载 |
mergeDelayError | 增加带 prefetch的重载 |
sequenceEqual | 增加带 bufferSize的重载 |
switchOnNext | 增加带 prefetch的重载 |
switchOnNextDelayError | 增加带 prefetch的重载 |
timer | 被废弃 |
zip | 增加带 bufferSize和 delayErrors的重载, 拆分成了 zipArray和 zipIterable |
实例方法:
1.x | 2.x |
---|---|
all | RC3 返回 Single<Boolean> |
any | RC3 返回 Single<Boolean> |
asObservable | 重命名为 hide(), 隐藏所有的身份 |
buffer | 重载自定义的 Collection提供者 |
cache(int) | 被废弃 |
collect | RC3 返回 Single<U> |
collect(U, Action2<U, T>) | 改成 collectInto和 RC3 返回 Single<U> |
concatMap | 增加带 prefetch的重载 |
concatMapDelayError | 增加带 prefetch的重载, 支持延时 |
concatMapEager | 增加带 prefetch的重载 |
concatMapEagerDelayError | 增加带 prefetch的重载, 支持延时 |
count | RC3 返回 Single<Long> |
countLong | 被删除, 使用 count |
distinct | 重载自定义的 Collection提供者. |
doOnCompleted | 重命名为 doOnComplete |
doOnUnsubscribe | 重命名为 Flowable.doOnCancel和 doOnDispose, additional info |
N/A | 增加 doOnLifecylce来处理 onSubscribe, request和 cancel |
elementAt(int) | RC3 不再发射 NoSuchElementException 如果源比索引更小 |
elementAt(Func1, int) | 被删除, 使用 filter(predicate).elementAt(int)代替 |
elementAtOrDefault(int, T) | 重命名为 elementAt(int, T)和 RC3 返回 Single<T> |
elementAtOrDefault(Func1, int, T) | 被删除, 使用 filter(predicate).elementAt(int, T)代替 |
first() | RC3 重命名为 firstElement返回 Maybe<T> |
first(Func1) | 被删除, 使用 filter(predicate).first()代替 |
firstOrDefault(T) | 重命名为 first(T)RC3 返回 Single<T> |
firstOrDefault(Func1, T) | 被删除, 使用 filter(predicate).first(T)代替 |
flatMap | 增加带 prefetch的重载 |
N/A | 增加 forEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]])用于有条件停止 consumption |
groupBy | 增加带 bufferSize和 delayError的重载, 支持 支持内部自定义map,RC1中没有 |
ignoreElements | RC3 返回 Completable |
isEmpty | RC3 返回 Single<Boolean> |
last() | RC3 重命名为 lastElement返回 Maybe<T> |
last(Func1) | 被删除, 使用 filter(predicate).last()代替 |
lastOrDefault(T) | 重命名为 last(T)RC3 返回 Single<T> |
lastOrDefault(Func1, T) | 被删除, 使用 filter(predicate).last(T)代替 |
nest | 被删除, 使用 just代替 |
publish(Func1) | 增加带 prefetch的重载 |
reduce(Func2) | RC3 返回 Maybe<T> |
N/A | 增加 reduceWith(Callable, BiFunction)为了减少自定义Subscriber, 返回 Single<T> |
N/A | 增加 repeatUntil(BooleanSupplier) |
repeatWhen(Func1, Scheduler) | 删除了重载, 使用 subscribeOn(Scheduler).repeatWhen(Function)代替 |
retry | 增加 retry(Predicate), retry(int, Predicate) |
N/A | 增加 retryUntil(BooleanSupplier) |
retryWhen(Func1, Scheduler) | 删除了重载, 使用 subscribeOn(Scheduler).retryWhen(Function)代替 |
N/A | 增加 sampleWith(Callable, BiFunction)去扫描自定义的Subscriber方式 |
single() | RC3 重命名为 singleElement返回 Maybe<T> |
single(Func1) | 被删除,使用 filter(predicate).single()代替 |
singleOrDefault(T) | 重命名为 single(T)RC3 返回 Single<T> |
singleOrDefault(Func1, T) | 被删除,使用 filter(predicate).single(T)代替 |
skipLast | 增加带 bufferSize和 delayError的重载 |
startWith | 2-9 参数的被删除了, 使用 startWithArray代替 |
N/A | 增加 startWithArray来减少二义性 |
N/A | 增加 subscribeWith返回输入的订阅对象 |
switchMap | 增加带 prefetch的重载 |
switchMapDelayError | 增加带 prefetch的重载 |
takeLastBuffer | 被删除 |
N/A | 增加 test() |
timeout(Func0<Observable>, ...) | 方法签名改成了 timeout(Publisher, ...)删除了方法, 如果有需要,使用 defer(Callable<Publisher>>) |
toBlocking().y | 内联 blockingY()操作符, 除了 toFuture |
toCompletable | RC3 被删除, 使用 ignoreElements代替 |
toList | RC3 返回 Single<List<T>> |
toMap | RC3 返回 Single<Map<K, V>> |
toMultimap | RC3 返回 Single<Map<K, Collection<V>>> |
N/A | 增加 toFuture |
N/A | 增加 toObservable |
toSingle | RC3 被删除, 使用 single(T)代替 |
toSortedList | RC3 增加 Single<List<T>> |
withLatestFrom | 5-9 个参数的重载被删除 |
zipWith | 增加带 prefetch和 delayErrors的重载 |
不同的返回类型
2.x中一些的操作符产生确切的一个值或者一个错误时,返回Single。
操作符 | 旧返回值 | 新返回值 | 备注 |
---|---|---|---|
all(Predicate) | Observable<Boolean> | Single<Boolean> | 如果所有的元素都匹配,则发射true |
any(Predicate) | Observable<Boolean> | Single<Boolean> | 如果所有的元素都匹配,则发射true |
count() | Observable<Long> | Single<Long> | 计算序列中元素的数量 |
elementAt(int) | Observable<T> | Maybe<T> | Emits 给定位置处的元素或完成的元素 |
elementAt(int, T) | Observable<T> | Single<T> | 发射指定位置的元素或默认元素 |
first(T) | Observable<T> | Single<T> | 发射第一个元素或者抛出NoSuchElementException |
firstElement() | Observable<T> | Maybe<T> | 发射第一个元素或者结束 |
ignoreElements() | Observable<T> | Completable | 忽略所有非终端事件 |
isEmpty() | Observable<Boolean> | Single<Boolean> | 如果源为空,则发射true |
last(T) | Observable<T> | Single<T> | 发射最后一个元素或默认值 |
lastElement() | Observable<T> | Maybe<T> | 发射最后一个元素或结束 |
reduce(BiFunction) | Observable<T> | Maybe<T> | 发射减少的值或者结束 |
reduce(Callable, BiFunction) | Observable<U> | Single<U> | 发射减少的值或者初始的值 |
reduceWith(U, BiFunction) | Observable<U> | Single<U> | 发射减少的值或者初始的值 |
single(T) | Observable<T> | Single<T> | 发射唯一的元素或默认值 |
singleElement() | Observable<T> | Maybe<T> | 发射唯一的元素或结束 |
toList() | Observable<List<T>> | Single<List<T>> | 将所有元素放到 List |
toMap() | Observable<Map<K, V>> | Single<Map<K, V>> | 将所有元素放到 Map |
toMultimap() | Observable<Map<K, Collection<V>>> | Single<Map<K, Collection<V>>> | 将所有元素包装到Collection后放到 Map |
toSortedList() | Observable<List<T>> | Single<List<T>> | 将所有元素放到 List并排序 |
移除
为了保证最终的2.0API尽可能干净,我们删除了一些候选的方法和组件。删除时的版本 | 组件 | 备注 |
---|---|---|
RC3 | Flowable.toCompletable() | 使用 Flowable.ignoreElements()代替 |
RC3 | Flowable.toSingle() | 使用 Flowable.single(T)代替 |
RC3 | Flowable.toMaybe() | 使用 Flowable.singleElement()代替 |
RC3 | Observable.toCompletable() | 使用 Observable.ignoreElements()代替 |
RC3 | Observable.toSingle() | 使用 Observable.single(T)代替 |
RC3 | Observable.toMaybe() | 使用 Observable.singleElement()代替 |
其他改变
doOnCancel/doOnDispose/unsubscribeOn
在1.x中,doOnUnsubscribe总是执行终端事件,因为
SafeSubscriber调用了
unsubscribe。这实际上是没有必要的。Reactive-Streams规范中,一个终端事件到达
Subscriber,上游的
Subscription会取消,因此调用
cancel()是一个空操作。由于同样的原因
unsubscribeOn也没被在终端路径上调用,但只有实际在链上调用
cancel时,才会调用
unsubscribeOn。因此,下面的序列不会被调用
doOnCancel:
Flowable.just(1, 2, 3) .doOnCancel(() -> System.out.println("Cancelled!")) .subscribe(System.out::println);然而,下面将会调用
take操作符在传送过程中取消
onNext
Flowable.just(1, 2, 3) .doOnCancel(() -> System.out.println("Cancelled!")) .take(2) .subscribe(System.out::println);如果你需要在终端或者取消时执行清理,考虑使用
using操作符代替。
相关文章推荐
- Cocos2D-x游戏开发之二十三:CCNotificationCenter观察者模式(2)-不同层之间事件的发送和接受
- C#(模板模式)将父窗体继承之后重写一个按钮的事件,为什么每次都要运行两次才结束?
- 一个插排引发的设计思想 (一) 观察者模式
- 采用事件驱动编程以及GUI组件开发一个贷款计算器
- JavaScript实现父子dom同时绑定两个点击事件,一个用捕获,一个用冒泡时执行顺序的方法
- [Socket网络编程]由于套接字没有连接并且(当使用一个 sendto 调用发送数据报套接字时)没有提供地址,发送或接收数据的请求没有被接受。
- java语言讲解singleton的编程思想---深入浅出单实例Singleton设计模式
- 们只是「电脑玩物」 首页 实用技巧 免费资源 超好玩的游戏 软件推荐 IT技术 资讯 编程 其它 MVVM设计模式和WPF中的实现(四) 事件绑定 07net01.com 发布于 2015-10-08
- JavaScript-父子dom同时绑定两个点击事件,一个用捕获,一个用冒泡时执行顺序
- 有关js改变class或remove掉class之后,原class绑定事件依旧存在的问题
- 大小端字节序存在的意义,为什么不用一个标准呢? 在网络编程和一些服务器中采用的是大端的字节序,而一般的主机采用的是小端的字节序,为什么要分成两种?不能合成一个吗?
- 举例讲解Python设计模式编程中的访问者与观察者模式
- 举例讲解Python设计模式编程中的访问者与观察者模式
- 客户端C和服务器S之间建立了一个TCP连接,TCP最大段长度为1KB,客户端C当前的拥塞窗口是16KB,向服务器S连续发送2个最大段之后,成功收到服务器S发送的第一段的确认段,确认段中通告的接受窗口大
- 随笔一个dom节点绑定事件
- JS给某一个节点的子节点全部绑定事件
- TT和LG编程设计模式之观察者
- 基于键值的观察者模式编程
- 记一个IE浏览器兼容模式与IE文档模式不一致的问题
- 实现自己的LUA绑定器-一个模板编程挑战