Android——深入浅出RxJava 和RxAndroid(操作符)
2017-12-19 11:31
429 查看
接下来说操作符的分类:
Creating Observables(创建型操作符)、
这里用代码演示:
just操作符无非是create的简便写法,
Form操作符是针对数组或者数组列表来操作:
当然也可以是数组列表,结果是一样格式的:
官方介绍defer是在subscribe调用时才创建observable,
将会打印出 “defer操作符”;,如果将defer换成just试试。
这样将打印出null,这是因为在subscribe之前就已经创建好observable对象了,更改值是不管用的。
Empty操作符是创建一个空的observable对象,直接调用oncompleted方法。
Never操作符是不进行发射、回调的方法。
Throw是发送一个错误,执行onerror方法。
Interval操作符就是我们讲的定时器,多长时间操作一次,是有周期性的。
使用interval做周期性操作。当有“每隔xx秒后执行yy操作”类似的需求的时候,想到使用interval
例如:每隔2秒输出日志“helloworld”。
range顾名思义是范围内输出,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:,
输出1-5。
repeat是重复操作符。上一个代码中重复执行,将打印1,2,3,4,5,两次。
timer同样是定时发射器,而这个是没有周期性的
使用timer做定时操作。当有“x秒后执行y操作”类似的需求的时候,想到使用timer
例如:2秒后输出日志“hello world”,然后结束。
Transforming Observable(转换型操作符)、
Map操作符是将一个对象转化成另一个对象。
将打印出“123我是转化的String”;
FlatMap操作符是一对多的转化对象,例如可以打印一组学生的名字,用from,但是如果再打印每个学生所选的课程呢?这个时候就用flatMap。
buffer操作符是将数据指定个数捆绑,捆绑后发送。
结果:
Scan操作符是一个每次将前面的数据进行累计,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者。
结果:
window操作符会在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable。
Filtering Observable(过滤型操作符)、
Debounce(超过指定时间才发送),每一个数据只有超出指定时间后才会被发送出去,
打印结果是之前0-4等待时间都没超过5秒,打印结果是5,6,7,8,9;
Distinct(去掉重复)操作符,会自动过滤掉重复的元素。
打印出1,2,3;
ElementAt操作符取出指定位置数据,
打印结果3。
Filter过滤操作符。
打印结果3,5,8;
带有first和last都不演示了。。很简单。
IgnoreElements忽略数据,直接调用onCompleted或者onError。
打印结果 onCompleted。
Sample操作符是在指定时间内采集数据并发送。
打印结果:3,7;
Skip操作符,跳过前几项,输出剩余。
打印结果:3,4,5
take操作符取前几项数据发送。
打印结果:1,2,3
Combining Observables(组合型操作符)、
Zip操作符是用来合并两个Observable发射的数据项,根据Function2函数生成一个新的值并发射出去,当其中一个Observable发送数据结束或者出现异常后(两边个数不够,对应不上),另一个observable将停止发射。
结果是next:6,next :8,next :10,next :12,因为第一个observable没有数据了所以不能与9结合,终止另一个数据的发射。
merge操作符就是将两个observable按顺序发送数据项进行排列组合,简单说就是将两个observable对象合并成一个observable对象来处理。
结果就是merge:1,merge:2…….merge:9。
StartWith操作符简单点说就是在数据项前面添加新的observable数据项。
结果就是StartWith:5,6,7,8,9然后才是1234。
CombineLatest操作符是用于将第一个observable对象最近发射的数据与第二个observable对象进行函数的规则组合。看代码。
结果:
这里意思就是第一个observable对象的4是和第二个observable对象离得最近,所以是4和第二个observable对象发射的数据进行组合,产生新的数据。
join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并。
switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者。
Error Handling Operators(处理错误)。
常用场景:
使用throttleFirst防止按钮重复点击
ps:debounce也能达到同样的效果
使用schedulePeriodically做轮询请求
解决嵌套回调(callback hell)问题
响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
使用merge合并两个数据源。
例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。
Creating Observables(创建型操作符)、
Create、Just、Form、Defer、Empty/Never/Throw、 Interval、Range、Repeat、Start、Timer,
这里用代码演示:
public static void create() { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Create操作符"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted():"); } @Override public void onError(Throwable e) { System.out.println("onCompleted():"); } @Override public void onNext(String s) { System.out.println("onNext():"+s); } }); }
just操作符无非是create的简便写法,
public static void just() { Observable.just("just操作符").subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted():"); } @Override public void onError(Throwable e) { System.out.println("onError():"); } @Override public void onNext(String s) { System.out.println("onNext():"+s); } }); }
Form操作符是针对数组或者数组列表来操作:
public static void form() { Observable.from(new Integer[]{1,2,3,4,5,6}).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted():"); } @Override public void onError(Throwable e) { System.out.println("onError():"); } @Override public void onNext(Integer s) { System.out.println("onNext():"+s); } }); }
当然也可以是数组列表,结果是一样格式的:
List datas=new ArrayList(); for (int i = 0; i <10 ; i++) { datas.add(i); } Observable.from(datas).subscribe(new Subscriber<Integer>() {....}
官方介绍defer是在subscribe调用时才创建observable,
private static String string; public static void defer() { Observable observable=Observable.defer(new Func0<Observable<String>>() { @Override public Observable call() { return Observable.just(string); } }); string="defer操作符"; observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted():"); } @Override public void onError(Throwable e) { System.out.println("onError():"); } @Override public void onNext(String s) { System.out.println("onNext():"+s); } }); }
将会打印出 “defer操作符”;,如果将defer换成just试试。
private static String string; public static void defer() { // Observable observable=Observable.defer(new Func0<Observable<String>>() { // @Override // public Observable call() { // return Observable.just(string); // } // }); Observable observable=Observable.just(string); string="defer操作符"; observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted():"); } @Override public void onError(Throwable e) { System.out.println("onError():"); } @Override public void onNext(String s) { System.out.println("onNext():"+s); } }); } }
这样将打印出null,这是因为在subscribe之前就已经创建好observable对象了,更改值是不管用的。
Empty操作符是创建一个空的observable对象,直接调用oncompleted方法。
Never操作符是不进行发射、回调的方法。
Throw是发送一个错误,执行onerror方法。
Interval操作符就是我们讲的定时器,多长时间操作一次,是有周期性的。
使用interval做周期性操作。当有“每隔xx秒后执行yy操作”类似的需求的时候,想到使用interval
例如:每隔2秒输出日志“helloworld”。
public static void interval() { Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { System.out.println("onNext():"+aLong); } }); } //Schedulers.trampoline()这句话不添加,我的是没有效果的。
range顾名思义是范围内输出,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:,
public static void range() { Observable.range(1,5).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.println("onNext():"+integer); } }); }
输出1-5。
repeat是重复操作符。上一个代码中重复执行,将打印1,2,3,4,5,两次。
public static void range() { Observable.range(1,5).repeat(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.println("onNext():"+integer); } }); }
timer同样是定时发射器,而这个是没有周期性的
使用timer做定时操作。当有“x秒后执行y操作”类似的需求的时候,想到使用timer
例如:2秒后输出日志“hello world”,然后结束。
Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onCompleted() { log.d ("completed"); } @Override public void onError(Throwable e) { log.e("error"); } @Override public void onNext(Long number) { log.d ("hello world"); } });
Transforming Observable(转换型操作符)、
Map、FlatMap、GroupBy、Buffer、Scan、Window。
Map操作符是将一个对象转化成另一个对象。
public static void map() { Observable.just(123).map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer+"我是转化的String"; } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } }); }
将打印出“123我是转化的String”;
FlatMap操作符是一对多的转化对象,例如可以打印一组学生的名字,用from,但是如果再打印每个学生所选的课程呢?这个时候就用flatMap。
Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber);
buffer操作符是将数据指定个数捆绑,捆绑后发送。
Observable.range(0, 5).buffer(2).subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { System.out.println(integers); } });
结果:
Scan操作符是一个每次将前面的数据进行累计,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者。
public static void scan() { Observable.range(0,5).scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer arg0) { return sum+arg0; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("sum:"+integer); } }); }
结果:
window操作符会在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable。
window(long timespan, TimeUnit unit) 第一个是缓存的间隔时间,第二个参数是时间单位 Observable.interval(1,TimeUnit.SECONDS).take(10).window(3,TimeUnit.SECONDS).subscribe(new Observer<Observable<Long>>() { @Override public void onCompleted() { LogUtils.d("------>onCompleted()"); } @Override public void onError(Throwable e) { LogUtils.d("------>onError()" + e); } @Override public void onNext(Observable<Long> integerObservable) { LogUtils.d("------->onNext()"); integerObservable.subscribe(new Action1<Long>() { @Override public void call(Long integer) { LogUtils.d("------>call():" + integer); } }); } });
Filtering Observable(过滤型操作符)、
Debounce(超过指定时间才发送)、Distinct(去掉重复)、ElementAt(指定位置的数据)、F 1122c ilter(按条件过滤)、First(取列表第一位数据)、IgnoreElements(忽略列表上所有元素,只进行onCompleted或者onError方法)、Last(取最后一位数据)、Sample(取样,当到了指定时间采集到数据后才发送)、Skip(跳过几项数据)、SkipLast(跳过数据列表的最后几项)、Take(取前几个数据)、Take(取列表的后几个数据)
Debounce(超过指定时间才发送),每一个数据只有超出指定时间后才会被发送出去,
public static void debounce() { Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { for (int i = 0; i < 10 ; i++) { Thread.sleep(1000*i); subscriber.onNext(i); } subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).debounce(5,TimeUnit.SECONDS).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印结果是之前0-4等待时间都没超过5秒,打印结果是5,6,7,8,9;
Distinct(去掉重复)操作符,会自动过滤掉重复的元素。
public static void distinct() { Observable.just(1,2,3,2,1).distinct().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印出1,2,3;
ElementAt操作符取出指定位置数据,
public static void elementAt() { Observable.just(1,2,3,4,5).elementAt(2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印结果3。
Filter过滤操作符。
Observable.just(1,2,3,5,8).distinct().filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer>2; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } });
打印结果3,5,8;
带有first和last都不演示了。。很简单。
IgnoreElements忽略数据,直接调用onCompleted或者onError。
public static void IgnoreElement() { Observable.just(1,2,3,4).ignoreElements().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"+e); } @Override public void onNext(Integer integer) { System.out.println("onNext"+integer); } }); }
打印结果 onCompleted。
Sample操作符是在指定时间内采集数据并发送。
public static void sample() { Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { for (int i = 0; i <10 ; i++) { subscriber.onNext(i); Thread.sleep(1000); } subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); subscriber.onError(e); } } }).sample(4,TimeUnit.SECONDS).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印结果:3,7;
Skip操作符,跳过前几项,输出剩余。
public static void skip() { Observable.just(1,2,3,4,5).skip(2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印结果:3,4,5
take操作符取前几项数据发送。
public static void take() { Observable.just(1,2,3,4,5).take(3).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); }
打印结果:1,2,3
Combining Observables(组合型操作符)、
Zip、Merge、StartWith、CombineLatest、Join、SwitchOnNext
Zip操作符是用来合并两个Observable发射的数据项,根据Function2函数生成一个新的值并发射出去,当其中一个Observable发送数据结束或者出现异常后(两边个数不够,对应不上),另一个observable将停止发射。
public static void zip() { Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(5,6,7,8,9); Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer+integer2; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("next:"+integer); } }); }
结果是next:6,next :8,next :10,next :12,因为第一个observable没有数据了所以不能与9结合,终止另一个数据的发射。
merge操作符就是将两个observable按顺序发送数据项进行排列组合,简单说就是将两个observable对象合并成一个observable对象来处理。
public static void merage() { Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(5,6,7,8,9); Observable.merge(observable1, observable2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("merge:"+integer); } }); }
结果就是merge:1,merge:2…….merge:9。
StartWith操作符简单点说就是在数据项前面添加新的observable数据项。
public static void startwith() { Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(5,6,7,8,9); observable1.startWith(observable2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("startwith:"+integer); } }); }
结果就是StartWith:5,6,7,8,9然后才是1234。
CombineLatest操作符是用于将第一个observable对象最近发射的数据与第二个observable对象进行函数的规则组合。看代码。
public static void cominelatest() { Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(5,6,7,8,9); observable1.combineLatest(observable1, observable2, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { System.out.println("第一个数据:"+integer+" 第二个数据"+integer2); return integer+integer2; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("两个数据的和:"+integer); } }); }
结果:
这里意思就是第一个observable对象的4是和第二个observable对象离得最近,所以是4和第二个observable对象发射的数据进行组合,产生新的数据。
join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并。
switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者。
Error Handling Operators(处理错误)。
常用场景:
使用throttleFirst防止按钮重复点击
ps:debounce也能达到同样的效果
RxView.clicks(button) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(new Observer<Object>() { @Override public void onCompleted() { log.d ("completed"); } @Override public void onError(Throwable e) { log.e("error"); } @Override public void onNext(Object o) { log.d("button clicked"); } });
使用schedulePeriodically做轮询请求
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> observer) { Schedulers.newThread().createWorker() .schedulePeriodically(new Action0() { @Override public void call() { observer.onNext(doNetworkCallAndGetStringResult()); } }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS); } }).subscribe(new Action1<String>() { @Override public void call(String s) { log.d("polling….”)); } })
解决嵌套回调(callback hell)问题
NetworkService.getToken("username", "password") .flatMap(s -> NetworkService.getMessage(s)) .subscribe(s -> { System.out.println("message: " + s); })
响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this); RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences); Preference<Boolean> checked = rxPreferences.getBoolean("checked", true); CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test); RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());
使用merge合并两个数据源。
例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。
Observable.merge(getDataFromFile(), getDataFromNet()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { log.d("done loading all data"); } @Override public void onError(Throwable e) { log.d("error"); } @Override public void onNext(String data) { log.d("all merged data will pass here one by one!") });
相关文章推荐
- RxJava 和 RxAndroid 二(操作符的使用)
- RxJava与RxAndroid 操作符
- RxJava 和 RxAndroid 二(操作符的使用)
- RxJava 和 RxAndroid 二(操作符的使用)
- 谁来讲讲Rxjava、rxandroid中的操作符的作用?
- Android——深入浅出RxJava 1和RxAndroid(一)
- RxJava 和 RxAndroid 二(操作符的使用)
- RxJava 和 RxAndroid (操作符的使用)
- Rxjava,rxandroid中的操作符的作用
- RxJava 和 RxAndroid 二(操作符的使用)
- RxJava 和 RxAndroid 二(操作符的使用)
- 浅析RxJava和RxAndroid关于线程切换和操作符作用
- RxJava 和 RxAndroid 一 (基础)
- rxjava + rxandroid 调度器
- Android响应式编程框架---RxJava&RxAndroid2.0使用笔记
- RxJava(RxAndroid)学习资料
- Android RxJava使用介绍(四) RxJava的操作符
- RxAndroid从零开始学之五(常见操作符与三级缓存)
- Android拾萃 - RxJava操作符列表和响应类型(二)
- RxJava与RxAndroid的学习之路