Rxjava 的过滤操作符
2018-02-01 11:38
225 查看
public class RxFilterActivity extends AppCompatActivity { private final static String TAG = RxFilterActivity.class.getSimpleName(); @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // 根据指定条件过滤事件 有 filter() ofType() skip() // skipLast() distinct() distinctUntilChanged() /** * 作用:过滤特定的事件,根据书写的判断条件过滤 */ filter(); /** * 作用:过滤特定的数据类型 */ ofType(); /** * 跳过某个事件 */ skipAndSkipLast(); /** * 过滤事件中重复的事件/连续重复的事件 distinct / distinctUntilChanged */ distinct(); //根据指定事件数量过滤事件 有 take() takeLast() /** * 通过设置指定事件的数量,只发送特定数量的事件 * 指定观察者最多能接收到的事件数量 */ take(); /** * 指定观察者只能接收到被观察者发送的最后几个事件 */ takeLast(); //根据指定时间过滤事件 有throttleFirst() throttleLast() sample() //throttleWithTimeout debounce() /** * 在某段时间内,只发送该段时间内第1次事件 / 最后1次事件 */ throttleFirst(); throttleLast(); /** * 在某段时间内,只发送该段时间内最新(最后)1次事件 * simple() 与 throttleLast 相似,此处就不做详细介绍了 */ /** * 发送数据事件时,若两次发送事件的间隔 < 指定时间,就会丢弃前一次的数据,直到指定 * 时间内都没有新数据发射时才会发送后一次的数据 */ throttleWithTimeout(); debounce(); //根据指定事件位置过滤事件 有firstElement() lastElement() elementAt() elementAtOrError() /** * 仅选取第1个元素 / 最后一个元素 */ firstElement(); lastElement(); /** * 指定接收某个元素(通过 索引值 确定) * 注:允许越界,即获取的位置索引 > 发送事件序列长度 */ elementAt(); /** * 在elementAt()的基础上, * 当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常 */ elementAtOrError(); } private void elementAtOrError() { Log.e(TAG,"-----------------------elementAtOrError-----------------------"); Observable.just(1,2,3,4,5) .elementAtOrError(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"这是个玩笑,不会走到这的"); } }); } private void elementAt() { Log.e(TAG,"-----------------------elementAt-----------------------"); Observable.create(getSource()) .elementAt(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"事件通过的value = " + integer); } }); Observable.create(getSource()) .elementAt(12,-10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"事件通过的value = " + integer); } }); } private void lastElement() { Log.e(TAG,"-----------------------lastElement-----------------------"); Observable.just(1,2,3,4,5) .lastElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG," lastElement 通过的事件value = " + integer); } }); } private void firstElement() { Log.e(TAG,"-----------------------firstElement-----------------------"); Observable.just(1,2,3,4,5,6) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG," firstElement 通过的事件value = " + integer); } }); } private void debounce() { Log.e(TAG,"-----------------------debounce-----------------------"); Observable.create(getSource()) .debounce(2,TimeUnit.SECONDS,Schedulers.io()) .subscribe(getObserver()); } private void throttleWithTimeout() { Log.e(TAG,"-----------------------throttleWithTimeout-----------------------"); Observable.create(getSource()) //每1秒中采用数据 .throttleWithTimeout(1,TimeUnit.SECONDS,Schedulers.io()) .subscribe(getObserver()); } private void throttleLast() { Log.e(TAG,"-----------------------throttleLast-----------------------"); Observable.create(getSource()).observeOn(Schedulers.io()) .throttleLast(1, TimeUnit.SECONDS) .subscribe(getObserver()); } @NonNull private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"订阅成功,准备接受事件"); } @Override public void onNext(Integer integer) { Log.e(TAG,"接受到的事件 value = " + integer); } @Override public void onError(Throwable e) { Log.e(TAG,"事件报错"+e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"事件发送完成"); } }; } @NonNull private ObservableOnSubscribe<Integer> getSource() { return new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(300); e.onNext(2); Thread.sleep(300); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(300); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }; } private void throttleFirst() { Log.e(TAG,"-----------------------throttleFirst-----------------------"); Observable.create(getSource()).observeOn(Schedulers.io()) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(getObserver()); } private void takeLast() { Log.e(TAG,"-----------------------takeLast-----------------------"); Observable.just(1,2,3,4,5,6,7,8) .takeLast(4) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过takeLast过滤事件,接收被观察者最后发送的几个事件,通过value = " + integer); } }); } private void take() { Log.e(TAG,"-----------------------take-----------------------"); Observable.just(1,2,3,4,5,6,7,8) .take(5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过take过滤发送事件的数量,通过value = " + integer); } }); } private void distinct() { Log.e(TAG,"-----------------------distinct-----------------------"); Observable.just(1,2,3,1,2,3) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过distinct过滤掉数据源中重复的数据 通过的value = " + integer); } }); Log.e(TAG,"-----------------------distinctUntilChanged-----------------------"); Observable.just(1,2,3,3,3,1,2,3) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过distinctUntilChanged过滤掉数据源中连续重复的数据,通过的value = " + integer); } }); } private void skipAndSkipLast() { Log.e(TAG,"-----------------------skipAndSkipLast-----------------------"); Observable.just(1,2,3,4,5,6) //跳过正序的第一项 .skip(1) //跳过正序的最后两项 .skipLast(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过skip和skipLast跳过事件,通过的为value = " + integer); } }); } private void ofType() { Log.e(TAG,"-----------------------ofType-----------------------"); Observable.just(1,"abc",2d,3f,4L,'a',2) .ofType(Integer.class) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer s) throws Exception { Log.e(TAG,"通过ofType过滤事件,通过的 value = " + s); } }); } private void filter() { Log.e(TAG,"-----------------------filter-----------------------"); Observable.just(1,2,3,4,5) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通过Filter过滤事件,通过的 value = " + integer); } }); } }
相关文章推荐
- RxJava之过滤操作符
- Android RxJava 操作符详解系列:过滤操作符
- Rxjava的学习之过滤操作符—first
- Rxjava学习之过滤操作符—filter、elementAt
- RxJava 过滤操作符 ofType
- RxJava【过滤】操作符 filter distinct throttle take skip first
- RxJava 过滤操作符 first last single
- Rxjava学习之过滤操作符 — distinct
- RxJava操作符之过滤操作符(五)
- RxJava使用详解--过滤操作符
- RxJava操作符(04-过滤操作)
- RxJava系列4(过滤操作符)
- RxJava操作符(04-过滤操作)
- RxJava操作符总结之过滤
- Rxjava 过滤操作符
- RxJava操作符总结之过滤
- RxJava过滤操作符实例
- RxJava-过滤操作符
- RxJava----操作符:过滤操作符
- RxJava过滤操作符