RxJava 各种操作符(1)
2015-10-27 15:17
696 查看
RxJava 各种操作符(1) - Creating Observables
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and TimerCreate
create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的.![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082425175.png)
Observable.create(new Observable.OnSubscribe() { @Override public void call (Subscriber observer){ try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } }).subscribe(new Subscriber() { @Override public void onNext (Integer item){ System.out.println("Next: " + item); } @Override public void onError (Throwable error){ System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted () { System.out.println("Sequence complete."); } });
运行结果如下: Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
在使用create操作符时,最好要在回调的call函数中增加isUnsubscribed的判断,以便在subscriber在取消订阅时不会再执行call函数中相关代码逻辑,从而避免导致一些意想不到的错误出现;
from操作符
from操作符是把其他类型的对象和数据类型转化成Observable![](http://reactivex.io/documentation/operators/images/from.c.png)
Integer[] items = { 0, 1, 2, 3, 4, 5 }; Observable myObservable = Observable.from(items); myObservable.subscribe( new Action1<Integer>() { @Override public void call(Integer item) { System.out.println(item); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } } );
运行结果如下: 0 1 2 3 4 5 Sequence complete
just操作符
just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082428177.png)
Observable.just(1, 2, 3) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
运行结果如下: Next: 1 Next: 2 Next: 3 Sequence complete.
defer操作符
defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的.![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082429178.png)
// 下面通过比较defer操作符和just操作符的运行结果作比较: i=10; Observable justObservable = Observable.just(i); i=12; Observable deferObservable = Observable.defer(new Func0<Observable<Object>>() { @Override public Observable<Object> call() { return Observable.just(i); } }); i=15; justObservable.subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { System.out.println("just result:" + o.toString()); } }); deferObservable.subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { System.out.println("defer result:" + o.toString()); } }); }
其中i是类的成员变量,运行结果如下: just result:10 defer result:15
可以看到,just操作符是在创建Observable就进行了赋值操作,而defer是在订阅者订阅时才创建Observable,此时才进行真正的赋值操作
timer操作符
timer操作符是创建一串连续的数字,产生这些数字的时间间隔是一定的;一种是隔一段时间产生一个数字,然后就结束,可以理解为延迟产生数字
![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082431179.png)
一种是每隔一段时间就产生一个数字,没有结束符,也就是是可以产生无限个连续的数字
![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082431180.png)
timer操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程.
//每隔两秒产生一个数字 Observable.timer(2, 2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onNext(Long aLong) { System.out.println("Next:" + aLong.toString()); } });
运行结果如下: Next:0 Next:1 Next:2 Next:3 ……
interval操作符
interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大;interval操作符的实现效果跟上面的timer操作符的第二种情形一样。![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082433181.png)
interval操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。
调用例子就不列出了,基本跟上面timer的调用例子一样。
range操作符
range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082433182.png)
//产生从3开始,个数为10个的连续数字 Observable.range(3,10).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onNext(Integer i) { System.out.println("Next:" + i.toString()); } });
运行结果如下: Next:3 Next:4 Next:5 Next:6 …. Next:12 Sequence complete.
repeat/repeatWhen操作符
repeat操作符是对某一个Observable,重复产生多次结果![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082434183.png)
repeatWhen操作符是对某一个Observable,有条件地重新订阅从而产生多次结果
![](http://www.2cto.com/uploadfile/Collfiles/20150604/20150604082436184.png)
repeat和repeatWhen操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。
repeat调用例子如下: //连续产生两组(3,4,5)的数字 Observable.range(3,3).repeat(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onNext(Integer i) { System.out.println("Next:" + i.toString()); } });
运行结果如下: Next:3 Next:4 Next:5 Next:3 Next:4 Next:5 Sequence complete.
//repeatWhen调用例子如下: Observable.just(1,2,3).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Void> observable) { //重复3次 return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() { @Override public Integer call(Void aVoid, Integer integer) { return integer; } }).flatMap(new Func1<Integer, Observable<?>>() { @Override public Observable<?> call(Integer integer) { System.out.println("delay repeat the " + integer + " count"); //1秒钟重复一次 return Observable.timer(1, TimeUnit.SECONDS); } }); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
运行结果如下: Next:1 Next:2 Next:3 repeat the 1 count Next:1 Next:2 Next:3 repeat the 2 count Next:1 Next:2 Next:3 repeat the 3 count Next:1 Next:2 Next:3 Sequence complete.
相关文章推荐
- blazeds实现java到flex类映射
- Java:正则表达式的详解
- Maven配置spring(只配置spring)
- eclipse中使用maven---添加本地jar
- eclipse安装git
- Eclipse+maven开发环境搭建
- JAVA基础之选择排序
- org.springframework.util.StringUtils的使用
- Eclipse 安装Svn 插件
- java调用Command命令
- eclipse 笔记
- Flex+BlazeDs+Java的教程及Demo
- 安卓开发中用eclipse提交代码到github遇到的rejected-non-fast-forward问题
- Java连接MySQL数据库
- 通过Ajax两种方式讲解Struts2接收数组表单的方法
- Java常见内存溢出异常分析(OutOfMemoryError)
- eclipse导入已有的工程时,在maven dependencies里有项目所需jar,但是启动项目时,报错不能找到jar
- Java IO教程
- spring 下载地址
- JAVA泛型——基本使用