RxJava----操作符:辅助操作符
2016-05-03 19:52
691 查看
Observable Utility Operators(辅助操作符)
结果:
dealy是延迟发射,delaySubscription则是延迟收到。
结果:
结果:
结果:
结果:
当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
结果:
结果:
结果:
结果:
结果:
元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。
结果:
Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。
结果:
注意:在调用
如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。
下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:
结果:
先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但实际使用中我们可能并不想直接结束这个Subscription。还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。
结果:
上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:
结果:
尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。
我们可以认为timeout()为一个Observable的限时的副本。
如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。
结果:
Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。
结果:
timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。
结果:
从结果可以看到,上面的数据大概每隔100毫秒发射一个。
如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。
结果:
Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。
using 有三个参数,分别是:
1.创建这个一次性资源的函数
2.创建Observable的函数
3.释放资源的函数
当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。
结果:
项目源码 GitHub求赞,谢谢!
引用:
RxJava操作符(六)Utility-云少嘎嘎嘎-ChinaUnix博客
RxJava 教程第三部分:驯服数据流之自定义操作函数 - 云在千峰
delay
顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。log("start subscrib:" + System.currentTimeMillis()/1000); Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() { @Override public void call(Subscriber<? super Long> subscriber) { for (int i = 1; i <= 2; i++) { Long currentTime=System.currentTimeMillis()/1000; log("subscrib:" + currentTime); subscriber.onNext(currentTime); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).subscribeOn(Schedulers.newThread()); observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong)); } });
结果:
start subscrib:1462519228 subscrib:1462519228 subscrib:1462519229 delay:1462519230---2 delay:1462519231---2
delaySubscription
不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。dealy是延迟发射,delaySubscription则是延迟收到。
log("start subscrib:" + System.currentTimeMillis()/1000); Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() { @Override public void call(Subscriber<? super Long> subscriber) { for (int i = 1; i <= 2; i++) { Long currentTime=System.currentTimeMillis()/1000; log("subscrib:" + currentTime); subscriber.onNext(currentTime); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).subscribeOn(Schedulers.newThread()); observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() { @Override public void call(Long aLong) { log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong)); } });
结果:
start subscrib:1462519279 subscrib:1462519281 delaySubscription:1462519281---0 subscrib:1462519282 delaySubscription:1462519282---0
do
do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doxxx操作符。doOnEach
doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。Observable observable=Observable.just(1,2,3); observable.doOnEach(new Action1<Notification>() { @Override public void call(Notification notification) { log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()); } }).subscribe(new Action1() { @Override public void call(Object o) { log(o.toString()); } }); Subject<Integer, Integer> values = ReplaySubject.create(); values.doOnEach(new Action1<Notification<? super Integer>>() { @Override public void call(Notification<? super Integer> notification) { log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()); } }).subscribe(new Action1() { @Override public void call(Object o) { log(o.toString()); } }); values.onNext(4); values.onNext(5); values.onNext(6); values.onError(new Exception("Oops"));
结果:
doOnEach send 1 type:OnNext 1 doOnEach send 2 type:OnNext 2 doOnEach send 3 type:OnNext 3 doOnEach send null type:OnCompleted doOnEach send 4 type:OnNext 4 doOnEach send 5 type:OnNext 5 doOnEach send 6 type:OnNext 6 doOnEach send null type:OnError
doOnNext
doOnNext则只有onNext的时候才会被触发。Subject<Integer, Integer> values = ReplaySubject.create(); values.doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { log("doOnNext send :"+integer.toString()); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { log(integer.toString()); } }); values.onNext(4); values.onError(new Exception("Oops"));
结果:
doOnNext send :4 4
doOnSubscribe
doOnSubscribe会在Subscriber进行订阅的时候触发回调。Observable observable=Observable.just(1,2); observable.subscribe(new Action1() { @Override public void call(Object o) { log("first:"+o.toString()); } }); observable.subscribe(new Action1() { @Override public void call(Object o) { log("second:"+o.toString()); } });
结果:
I'm be subscribed! first:1 first:2 I'm be subscribed! second:1 second:2
doOnUnSubscribe
doOnUnSubscribe则会在Subscriber进行反订阅的时候触发回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() { @Override public void call() { log("I'm be unSubscribed!"); } }); Subscription subscribe1 = observable.subscribe(); Subscription subscribe2 = observable.subscribe(); subscribe1.unsubscribe(); subscribe2.unsubscribe();
结果:
I'm be unSubscribed! I'm be unSubscribed!
doOnError
doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里;try { Observable observable = Observable.error(new Throwable("呵呵哒")).doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { log(throwable.getMessage().toString()); } }); observable.subscribe(); }catch (Exception e){ log("catch the exception"); }
结果:
呵呵哒 catch the exception
doOnComplete
doOnComplete会在OnCompleted发生的时候触发回调。Observable observable = Observable.empty().doOnCompleted(new Action0() { @Override public void call() { log("Complete!"); } }); observable.subscribe();
结果:
Complete!
doOnTerminate
DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;Subject<Integer, Integer> values = ReplaySubject.create(); values.doOnTerminate(new Action0() { @Override public void call() { log("order to terminate"); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { log(integer.toString()); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { log(throwable.getMessage().toString()); } }); values.onNext(4); values.onError(new Exception("Oops"));
结果:
4 order to terminate Oops
finallyDo
finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。Observable observable = Observable.empty().finallyDo(new Action0() { @Override public void call() { log("already terminate"); } }); observable.subscribe(new Action1() { @Override public void call(Object o) { } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }, new Action0() { @Override public void call() { log("Complete!"); } });
结果:
Complete! already terminate
materialize
materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。public final Observable<Notification<T>> materialize()
元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .materialize() .subscribe(new Action1<Object>() { @Override public void call(Object o) { log(o.toString()); } });
结果:
meterialize:0--type:OnNext meterialize:1--type:OnNext meterialize:2--type:OnNext meterialize:null--type:OnCompleted
Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。
dematerialize
deMeterialize则是与materialize 执行相反的过程。Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .materialize() .dematerialize() .subscribe(new Action1<Object>() { @Override public void call(Object o) { log(o.toString()); } });
结果:
0 1 2
注意:在调用
dematerialize()之前必须先调用
materialize(),否则会报错。
serialize
强制Observable按次序发射数据并且功能是有效的如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。
下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); subscriber.onNext(3); subscriber.onCompleted(); } }); observable.doOnUnsubscribe(new Action0() { @Override public void call() { log("Unsubscribed"); } }) .subscribe( new Action1<Integer>() { @Override public void call(Integer integer) { } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }, new Action0() { @Override public void call() { log("Complete!"); } });
结果:
1 2 Complete! Unsubscribed
先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但实际使用中我们可能并不想直接结束这个Subscription。还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。
observable.doOnUnsubscribe(new Action0() { @Override public void call() { log("Unsubscribed"); } }) .unsafeSubscribe(new Subscriber<Integer>() { @Override public void onCompleted() { log("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { } });
结果:
1 2 Complete! 3 Complete!
上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); subscriber.onNext(3); subscriber.onCompleted(); } }) .cast(Integer.class) .serialize(); observable.doOnUnsubscribe(new Action0() { @Override public void call() { log("Unsubscribed"); } }) .unsafeSubscribe(new Subscriber<Integer>() { @Override public void onCompleted() { log("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { } });
结果:
1 2 Complete!
尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。
timeout
添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知我们可以认为timeout()为一个Observable的限时的副本。
如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。
Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS); Subscription subscription = values .timeout(300,TimeUnit.MILLISECONDS) .subscribe(new Observer<Long>() { @Override public void onCompleted() { log("Complete!"); } @Override public void onError(Throwable e) { log(e.getMessage().toString()); } @Override public void onNext(Long aLong) { log(aLong+""); } });
结果:
0 1 2
...
Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i <= 3; i++) { try { Thread.sleep(i * 100); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { log(integer.toString()); } });
结果:
0 1 2
5
6
timestamp
给Observable发射的每个数据项添加一个时间戳timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。
public final Observable<Timestamped<T>> timestamp()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .timestamp() .subscribe(new Action1<Timestamped>() { @Override public void call(Timestamped mTimestamped) { log(mTimestamped.toString()); } });
结果:
Timestamped(timestampMillis = 1461758360570, value = 0) Timestamped(timestampMillis = 1461758360670, value = 1) Timestamped(timestampMillis = 1461758360771, value = 2)
从结果可以看到,上面的数据大概每隔100毫秒发射一个。
timeInterval
将一个Observable转换为发射两个数据之间所耗费时间的Observable如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。
public final Observable<TimeInterval<T>> timeInterval()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .timeInterval() .subscribe(new Action1<TimeInterval>() { @Override public void call(TimeInterval mTimeInterval) { log(mTimeInterval.toString()); } });
结果:
TimeInterval [intervalInMilliseconds=101, value=0] TimeInterval [intervalInMilliseconds=99, value=1] TimeInterval [intervalInMilliseconds=100, value=2]
using
创建一个只在Observable的生命周期内存在的一次性资源Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。
public static final <T,Resource> Observable<T> using( Func0<Resource> resourceFactory, Func1<? super Resource,? extends Observable<? extends T>> observableFactory, Action1<? super Resource> disposeAction)
using 有三个参数,分别是:
1.创建这个一次性资源的函数
2.创建Observable的函数
3.释放资源的函数
当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。
Observable observable = Observable.using(new Func0<Animal>() { @Override public Animal call() { return new Animal(); } }, new Func1<Animal, Observable<?>>() { @Override public Observable<?> call(Animal animal) { return Observable.timer(3, TimeUnit.SECONDS);//三秒后发射一次就completed // return Observable.timer(4, 2, TimeUnit.SECONDS);//没有completed,不停的发射数据 // return Observable.range(1,3);//一次发射三个数据,马上结束 // return Observable.just(1,2,3);//一次发射三个数据,马上结束 } }, new Action1<Animal>() { @Override public void call(Animal animal) { animal.relase(); } }); Subscriber subscriber = new Subscriber() { @Override public void onCompleted() { log("subscriber---onCompleted"); } @Override public void onError(Throwable e) { log("subscriber---onError"); } @Override public void onNext(Object o) { log("subscriber---onNext"+o.toString());//o是发射的次数统计,可以用timer(4, 2, TimeUnit.SECONDS)测试 } }; observable.count().subscribe(subscriber);
结果:
create animal animal eat animal eat animal eat subscriber---onNext1 subscriber---onCompleted animal released
项目源码 GitHub求赞,谢谢!
引用:
RxJava操作符(六)Utility-云少嘎嘎嘎-ChinaUnix博客
RxJava 教程第三部分:驯服数据流之自定义操作函数 - 云在千峰
相关文章推荐
- 跟我一起看Retrofit 2.0的源码
- RxJava之subscribeOn解惑
- Rxjava
- http://gank.io/post/560e15be2dca930e00da1083
- PopupWindow返回参数至Activity两种方式:接口和Rxjava
- Rxjava要素(一)
- 文章标题
- hot and cold observable
- Retrofit+Rxjava
- Android用Retrofit搭建通用网络请求模块
- RxJava在Android中的简单用例
- 关于RxJava的学习(1)
- Rxjava分析—Subject
- RxJava概述
- Rxjava分析—Subject
- RxJava概述
- RxJava
- 深入解析RxJava源码(一)Observable对象的构建
- rxJava的使用
- RxJava学习笔记