RxJava之三
2017-09-17 10:00
99 查看
一、前言
年轻的老司机们,我这么勤的为大家分享,却少有催更的,好吧。其实写这个系列不是为了吸睛,那咱们继续写我们的 RxJava 2.x 的操作符。
二、正题
7、distinct
这个操作符非常的简单、通俗、易懂,就是简单的去重嘛,我甚至都不想贴代码,但人嘛,总得持之以恒。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623111521695-986644993.png)
1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5) 2 .distinct() 3 .subscribe(new Consumer<Integer>() { 4 @Override 5 public void accept(@NonNull Integer integer) throws Exception { 6 mRxOperatorsText.append("distinct : " + integer + "\n"); 7 Log.e(TAG, "distinct : " + integer + "\n"); 8 } 9 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623111912929-1116953532.png)
Log 日志显而易见,我们在经过dinstinct() 后接收器接收到的事件只有1,2,3,4,5了。
8、Filter
信我,Filter 你会很常用的,它的作用也很简单,过滤器嘛。可以接受一个参数,让其过滤掉不符合我们条件的值![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623112208195-21083035.png)
1 Observable.just(1, 20, 65, -5, 7, 19) 2 .filter(new Predicate<Integer>() { 3 @Override 4 public boolean test(@NonNull Integer integer) throws Exception { 5 return integer >= 10; 6 } 7 }).subscribe(new Consumer<Integer>() { 8 @Override 9 public void accept(@NonNull Integer integer) throws Exception { 10 mRxOperatorsText.append("filter : " + integer + "\n"); 11 Log.e(TAG, "filter : " + integer + "\n"); 12 } 13 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623112245741-1548131349.png)
可以看到,我们过滤器舍去了小于10的值,所以最好的输出只有20,65,19。
9、buffer
buffer 操作符接受两个参数,buffef(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。也许你还不太理解,我们可以通过我们的示例图和示例代码来进一步深化它。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623112641507-1494318357.png)
1 Observable.just(1, 2, 3, 4, 5) 2 .buffer(3, 2) 3 .subscribe(new Consumer<List<Integer>>() { 4 @Override 5 public void accept(@NonNull List<Integer> integers) throws Exception { 6 mRxOperatorsText.append("buffer size : " + integers.size() + "\n"); 7 Log.e(TAG, "buffer size : " + integers.size() + "\n"); 8 mRxOperatorsText.append("buffer value : "); 9 Log.e(TAG, "buffer value : " ); 10 for (Integer i : integers) { 11 mRxOperatorsText.append(i + ""); 12 Log.e(TAG, i + ""); 13 } 14 mRxOperatorsText.append("\n"); 15 Log.e(TAG, "\n"); 16 } 17 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623113122179-1148175500.png)
如图,我们把1,2,3,4,5依次发射出来,经过buffer 操作符,其中参数 count 为3, s
14e89
kip 为2,而我们的输出 依次是 123,345,5。显而易见,我们 buffer 的第一个参数是count,代表最大取值,在事件足够的时候,一般都是取count个值,然后每次跳过skip个事件。其实看 Log 日志,我相信大家都明白了。
10、timer
timer 很有意思,相当于一个定时任务。在1.x 中它还可以执行间隔逻辑,但在2.x中此功能被交给了 interval,下一个会介绍。但需要注意的是,timer 和 interval 均默认在新线程。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623131628851-43543063.png)
1 mRxOperatorsText.append("timer start : " + TimeUtil.getNowStrTime() + "\n"); 2 Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n"); 3 Observable.timer(2, TimeUnit.SECONDS) 4 .subscribeOn(Schedulers.io()) 5 .observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程 6 .subscribe(new Consumer<Long>() { 7 @Override 8 public void accept(@NonNull Long aLong) throws Exception { 9 mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 10 Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 11 } 12 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623132501382-141907783.png)
显而易见,当我们两次点击按钮触发这个事件的时候,接收被延迟了2秒。
11、interval
如同我们上面可说,interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623142031538-939150944.png)
1 mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n"); 2 Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n"); 3 Observable.interval(3,2, TimeUnit.SECONDS) 4 .subscribeOn(Schedulers.io()) 5 .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程 6 .subscribe(new Consumer<Long>() { 7 @Override 8 public void accept(@NonNull Long aLong) throws Exception { 9 mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 10 Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 11 } 12 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623133212023-1840384932.png)
如同 Log 日志一样,第一次延迟了3秒后接收到,后面每次间隔了2秒。
然而,心细的小伙伴可能会发现,由于我们这个是间隔执行,所以当我们的Activity都销毁的时候,实际上这个操作还依然在进行,所以,我们得花点小心思让我们在不需要它的时候干掉它。查看源码发现,我们subscribe(Cousumer<? super T> onNext)返回的是Disposable,我们可以在这上面做文章。
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135010429-1999942958.png)
1 @Override 2 protected void doSomething() { 3 mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n"); 4 Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n"); 5 mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS) 6 .subscribeOn(Schedulers.io()) 7 .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程 8 .subscribe(new Consumer<Long>() { 9 @Override 10 public void accept(@NonNull Long aLong) throws Exception { 11 mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 12 Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n"); 13 } 14 }); 15 } 16 17 @Override 18 protected void onDestroy() { 19 super.onDestroy(); 20 if (mDisposable != null && !mDisposable.isDisposed()) { 21 mDisposable.dispose(); 22 } 23 }
哈哈,再次验证,解决了我们的疑惑。
12、doOnNext
其实觉得 doOnNext 应该不算一个操作符,但考虑到其常用性,我们还是咬咬牙将它放在了这里。它的作用是让订阅者在接收到数据之前干点有意思的事情。假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。1 Observable.just(1, 2, 3, 4) 2 .doOnNext(new Consumer<Integer>() { 3 @Override 4 public void accept(@NonNull Integer integer) throws Exception { 5 mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n"); 6 Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n"); 7 } 8 }).subscribe(new Consumer<Integer>() { 9 @Override 10 public void accept(@NonNull Integer integer) throws Exception { 11 mRxOperatorsText.append("doOnNext :" + integer + "\n"); 12 Log.e(TAG, "doOnNext :" + integer + "\n"); 13 } 14 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135430929-441429894.png)
13、skip
skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135650288-590231860.png)
1 Observable.just(1,2,3,4,5) 2 .skip(2) 3 .subscribe(new Consumer<Integer>() { 4 @Override 5 public void accept(@NonNull Integer integer) throws Exception { 6 mRxOperatorsText.append("skip : "+integer + "\n"); 7 Log.e(TAG, "skip : "+integer + "\n"); 8 } 9 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135725570-1405820959.png)
14、take
take,接受一个 long 型参数 count ,代表至多接收 count 个数据。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135843882-1573931455.png)
1 Flowable.fromArray(1,2,3,4,5) 2 .take(2) 3 .subscribe(new Consumer<Integer>() { 4 @Override 5 public void accept(@NonNull Integer integer) throws Exception { 6 mRxOperatorsText.append("take : "+integer + "\n"); 7 Log.e(TAG, "accept: take : "+integer + "\n" ); 8 } 9 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623135913726-589533489.png)
15、just
just,没什么好说的,其实在前面各种例子都说明了,就是一个简单的发射器依次调用onNext()方法。![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623140057866-945823179.png)
1 Observable.just("1", "2", "3") 2 .subscribeOn(Schedulers.io()) 3 .observeOn(AndroidSchedulers.mainThread()) 4 .subscribe(new Consumer<String>() { 5 @Override 6 public void accept(@NonNull String s) throws Exception { 7 mRxOperatorsText.append("accept : onNext : " + s + "\n"); 8 Log.e(TAG,"accept : onNext : " + s + "\n" ); 9 } 10 });
输出:
![](http://images2015.cnblogs.com/blog/845964/201706/845964-20170623140130898-982810161.png)
相关文章推荐
- RxJava开发精要2-为什么是Observables?
- RxJava
- RxJava处理网络连接失败和timer()、interval()、delay()之间的区别
- RxJava操作符(04-过滤操作)
- RXjava解决数据管理问题
- RxJava基础练习(3)
- Rxlifecycle使用详解,解决RxJava内存泄露问题
- RxJava 入门
- NotRxJava懒人专用指南-RxJava的演进过程
- Retrofit+RxJava
- RXJava的操作符
- RxJava 驯服数据流之副作用
- RxJava 学习笔记(六) --- Transforming 变换操作符
- RxJava 并发之意外情况处理
- [android架构篇]mvp+rxjava+retrofit+eventBus
- RxJava 学习笔记(九) --- Error Handling 错误处理操作
- Rxjava1.0 lift方法
- RxJava实现事件总线——RxBus
- Android应用架构之Retrofit使用 RxJava 详解 jsoup Android 平滑图片加载和缓存库 Glide 使用详解
- Android——2016新技术Rxjava