RxJava2.0的使用
2017-08-07 09:35
351 查看
添加依赖:
compile 'io.reactivex.rxjava2:rxjava:2.1.2' compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
bean:person(用于以下demo的使用)
public class Person { public int age; public String name; @Override public String toString() { return "name: " + name + " age: " + age; } }
基本使用:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("第一个"); LwzLog.d(TAG, "subscribe(),第一个"); e.onNext("第二个"); LwzLog.d(TAG, "subscribe(),第二个"); e.onNext("第三个"); LwzLog.d(TAG, "subscribe(),第三个"); e.onComplete(); } }).subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onNext(@NonNull String s) { LwzLog.d(TAG, "onNext()," + s); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError(),"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete(),"); } });
just操作,直接指定一个序列事件
Observable.just("1", "2", "3") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { mRxOperatorsText.append("accept : onNext : " + s + "\n"); Log.e(TAG,"accept : onNext : " + s + "\n" ); } });
Consumer直接处理事件
Observable.just("1", "2", "3") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { mRxOperatorsText.append("accept : onNext : " + s + "\n"); Log.e(TAG,"accept : onNext : " + s + "\n" ); } });可以看到Consumer取代了onNext(),并且隐藏了onError(),onComplete()等可能不需要关注的方法,代码看起来更简洁。
Single的使用
/** * Single的使用 */ public void single() { Single.just(1)// 只接收一个参数 .subscribe(new SingleObserver<Integer>() {// SingleObserver是Single特定的观察者 @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onSuccess(@NonNull Integer integer) { LwzLog.d(TAG, "onSuccess(),integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()"); } }); }
zip操作,将两个或多个事件合并
/** * zip,合并事件,取两个观察者事件合并,生成另个事件 */ public void zip() { // 第一个事件 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(21); LwzLog.d(TAG, "onNext(),21"); e.onNext(22); LwzLog.d(TAG, "onNext(),22"); e.onNext(23); LwzLog.d(TAG, "onNext(),23"); e.onComplete(); LwzLog.d(TAG, "onComplete(),observable"); } }); // 第二个事件 Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("张三"); LwzLog.d(TAG, "onNext(),张三"); e.onNext("李四"); LwzLog.d(TAG, "onNext(),李四"); e.onNext("王二"); LwzLog.d(TAG, "onNext(),王二"); e.onNext("麻子"); LwzLog.d(TAG, "onNext(),麻子"); e.onNext("王五"); LwzLog.d(TAG, "onNext(),王五"); e.onNext("赵六"); LwzLog.d(TAG, "onNext(),赵六"); e.onComplete(); LwzLog.d(TAG, "onComplete(),observable1"); } }); // 合并后的事件 Observable<Person> observable2 = Observable.zip(observable, observable1, new BiFunction<Integer, String, Person>() { @Override public Person apply(@NonNull Integer age, @NonNull String name) throws Exception { // 合并规则 Person person = new Person(); person.age = age; person.name = name; LwzLog.d(TAG, "apply(),person: " + person.toString()); return person; } }); // 合并后的处理。。。 observable2.subscribe(new Observer<Person>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onNext(@NonNull Person person) { LwzLog.d(TAG, "onNext(),person: " + person); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete(),"); } }); }
如果要合并多个,只需要多传入几个observable参数,将BiFunction改为Function3、Function4等(后面的数字取决于observable的个数)
map,将上一个事件按照函数规则转换成另一个事件
/** * map,对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化 */ public void map() { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(21); LwzLog.d(TAG, "next: " + 21); e.onNext(22); LwzLog.d(TAG, "next: " + 22); e.onComplete(); LwzLog.d(TAG, "complete "); } }); Observable<Person> observable1 = observable.map(new Function<Integer, Person>() { int position = 1; @Override public Person apply(@NonNull Integer num) throws Exception { Person person = new Person(); person.age = num; person.name = "name_" + position++; LwzLog.d(TAG, "apply(),person: " + person.toString()); return person; } }); observable1.subscribe(new Observer<Person>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onNext(@NonNull Person person) { LwzLog.d(TAG, "onNext(),person: " + person.toString()); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()"); } }); }
flatMap,将上游一个发送事件变换成多个发送事件
/** * flatMap,将上游一个发送事件变换成多个发送事件 */ public void flatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); LwzLog.d(TAG, "next: " + 1); e.onNext(2); LwzLog.d(TAG, "next: " + 2); e.onNext(3); LwzLog.d(TAG, "next: " + 3); e.onComplete(); LwzLog.d(TAG, "complete: "); } }).flatMap(new Function<Integer, ObservableSource<Person>>() { @Override public ObservableSource<Person> apply(@NonNull Integer integer) throws Exception { LwzLog.d(TAG, "apply(): integer: " + integer); List<Person> list = new ArrayList<>(); for (int i = 0; i < 2; i++) { Person person = new Person(); person.age = integer; person.name = "name_" + (integer + i); list.add(person); } // 延时发送 return Observable.fromIterable(list).delay(1000, TimeUnit.MILLISECONDS); } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Person>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onNext(@NonNull Person person) { LwzLog.d(TAG, "onNext(),person: " + person); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()"); } }); }
结果如下:
08-07 03:52:55.297 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe() 08-07 03:52:55.298 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 1 08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 1 08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 2 08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 2 08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 3 08-07 03:52:55.301 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 3 08-07 03:52:55.301 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: complete: 08-07 03:52:56.301 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_1 age: 1 08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_2 age: 1 08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_3 age: 3 08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_4 age: 3 08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_2 age: 2 08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_3 age: 2 08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()可以看出flatMap并不能保证事件的顺序
concatMap,作用和flatMap几户一样,唯一的区别是它能保证事件的顺序
/** * concatMap作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序 */ public void concatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); LwzLog.d(TAG, "next: " + 1); e.onNext(2); LwzLog.d(TAG, "next: " + 2); e.onNext(3); LwzLog.d(TAG, "next: " + 3); e.onComplete(); LwzLog.d(TAG, "complete: "); } }).concatMap(new Function<Integer, ObservableSource<Person>>() { @Override public ObservableSource<Person> apply(@NonNull Integer integer) throws Exception { LwzLog.d(TAG, "apply(): integer: " + integer); List<Person> list = new ArrayList<>(); for (int i = 0; i < 2; i++) { Person person = new Person(); person.age = integer; person.name = "name_" + (integer + i); list.add(person); } // 延时发送 return Observable.fromIterable(list).delay(1000, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<Person>() { @Override public void accept(Person person) throws Exception { LwzLog.d(TAG, "accept(),person: " + person); } }); }结果如下:
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 1 08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 1 08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 2 08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 3 08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: complete: 08-07 04:07:45.513 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_1 age: 1 08-07 04:07:45.513 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 1 08-07 04:07:45.514 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 2 08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 2 08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_3 age: 2 08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 3 08-07 04:07:47.517 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_3 age: 3 08-07 04:07:47.517 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_4 age: 3
doOnNext,让订阅者在接收到数据前干点事情的操作符
Observable.just(1, 2, 3, 4) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n"); Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n"); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mRxOperatorsText.append("doOnNext :" + integer + "\n"); Log.e(TAG, "doOnNext :" + integer + "\n"); } });
filter,过滤,取符合特定规则的数据
/** * filter,过滤,取符合特定规则的数据 */ public void filter() { List<Person> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { Person person = new Person(); person.age = i; person.name = "name_" + i; list.add(person); } Observable.fromIterable(list) .filter(new Predicate<Person>() { @Override public boolean test(@NonNull Person person) throws Exception { return person.age % 2 == 0; } }) .subscribe(new Consumer<Person>() { @Override public void accept(Person person) throws Exception { LwzLog.d(TAG, "accept(),person: " + person.toString()); } }); }结果如下:
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_0 age: 0 08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 2 08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_4 age: 4 08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_6 age: 6 08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_8 age: 8
skip,代表跳过多少个数目的事件再开始接收
/** * skip,代表跳过多少个数目的事件再开始接收 */ public void skip() { Observable.just("--", "无名者", "--", "张三", "李四", "王五", "赵六") .skip(3) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LwzLog.d(TAG, "string: " + s); } }); }结果如下:
08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 张三 08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 李四 08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 王五 08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 赵六
take,用于指定订阅者最多收到多少数据
/** * take,用于指定订阅者最多收到多少数据 */ public void take() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("张三"); LwzLog.d(TAG, "next: 张三"); e.onNext("李四"); LwzLog.d(TAG, "next: 李四"); e.onNext("王五"); LwzLog.d(TAG, "next: 王五"); e.onNext("赵六"); LwzLog.d(TAG, "next: 赵六"); e.onComplete(); LwzLog.d(TAG, "complete"); } }).take(2) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LwzLog.d(TAG, "string: " + s); } }); }结果如下:
08-08 08:16:07.791 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: string: 张三 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 张三 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: string: 李四 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 李四 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 王五 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 赵六 08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: complete由此可见,当事件超过指定事件时,超出的事件便不再发射,但是onNext会被执行
timer,替换java中的handler延时执行语句
/** * timer,替换java中的handler延时执行语句 */ public void timer() { LwzLog.d(TAG, "start time: " + Util.getNowStrTime()); Observable.timer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LwzLog.d(TAG, Util.getNowStrTime() + " " + Thread.currentThread().getName()); } }); }如果没有指定线程,默认执行在一个新的线程
interval,间隔执行,相当于timer+handler的定时器
/** * interval,替换timer+handler的定时执行操作 */ public void interval() { LwzLog.d(TAG, "start time: " + Util.getNowStrTime()); Observable.interval(1, 2, TimeUnit.SECONDS)// 延时1秒,每隔2秒执行一次 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread())// 默认新开一个线程,这里指定为主线程执行 .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LwzLog.d(TAG, Util.getNowStrTime() + " " + Thread.currentThread().getName()); } }); }
concat,连接操作符,可接受Observable的可变参数,或者Observable的集合
/** * concat,连接操作符,可接受Observable的可变参数,或者Observable的集合 */ public void concat() { Observable.concat(Observable.just(1, 2, 3), Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(4); LwzLog.d(TAG, "next: 4"); e.onComplete(); LwzLog.d(TAG, "complete"); } })).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept(),integer: " + integer); } }); }结果如下:
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1 08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 2 08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3 08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 4 08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: next: 4 08-09 02:40:15.519 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: complete
distinct,去重操作符
/** * distinct,去重操作符 */ public void distinct() { Observable.just("1", "1", "2", "2", "3", "4", "5") .distinct() .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LwzLog.d(TAG, "accept(),value: " + s); } }); }结果如下:
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 1 08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 2 08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 3 08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 4 08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 5
buffer,将一个事件序列分成若干个事件序列
/** * buffer,分流操作符 */ public void buffer() { Observable.just(1, 2, 3, 4, 5, 6, 7) .buffer(4, 3)// 分别表示单个序列的最大长度,跳过多少个作为下一个序列的起始位置 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { LwzLog.d(TAG, "accept(),list size: " + integers.size()); String temp = ""; for (int num : integers) { temp += num; } LwzLog.d(TAG, "accept(),integers: " + temp); } }); }结果如下:
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 4 08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 1234 08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 4 08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 4567 08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 1 08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 7可以看出一个事件序列被等分成指定长度的若干个序列(如果达不到指定长度则单独为一列),并按跳过指定的条目作为下一个序列的起始位置
debounce,去抖动,过滤掉发射速率过快的数据项
/** * debounce,去抖动,过滤掉发射速率过快的数据项 */ public void debounce() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(300); e.onNext(2); Thread.sleep(600); e.onNext(3); Thread.sleep(200); e.onNext(4); Thread.sleep(500); e.onNext(5); Thread.sleep(800); e.onComplete(); } }).debounce(500, TimeUnit.MILLISECONDS)// 过滤掉发射速率大于500ms的事件 .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept,integer: " + integer); } }); }结果如下:
08-09 08:19:47.111 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 2 08-09 08:19:47.912 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 4 08-09 08:19:48.413 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 5可以看到,发射速率大于500ms的都被过滤掉了,注意等于临界值的没有被过滤掉
defer,每次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable
/** * defer,每次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable */ public void defer() { Observable.defer(new Callable<ObservableSource<Integer>>() { @Override public ObservableSource<Integer> call() throws Exception { return Observable.just(1, 2, 3); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()"); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext(),integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()"); } }); }结果如下:
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe() 08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 1 08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 2 08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 3 08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()
last,取最后一个值
/** * last,取最后一个值 */ public void last() { Observable.just(1, 2, 3) .last(4)// 默认值,在没有值的时候生效 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept(),integer: " + integer); } }); }结果如下
08-09 09:05:21.017 17921-17921/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3
merge,将多个Observable合起来,接受可变参数,也支持使用迭代器集合
/** * merge,将多个Observable合起来,接受可变参数,也支持使用迭代器集合 */ public void merge() { Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept(),integer: " + integer); } }); }结果如下:
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1 08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 2 08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3 08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 4 08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 5 08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6
reduce,就是一次用一个方法处理一个值,可以有一个seed作为初始值
public void reduce() { Observable.just(1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { LwzLog.d(TAG, "apply(),integer: " + integer + " integer2: " + integer2); return integer + integer2; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept(),integer: " + integer); } }); }结果如下:
08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 1 integer2: 2 08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 3 integer2: 3 08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6
scan,和reducce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出
/** * scan,和reducce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出 */ public void scan() { Observable.just(1, 2, 3, 4) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { LwzLog.d(TAG, "apply(),integer: " + integer + " integer2: " + integer2); return integer + integer2; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "accept(),integer: " + integer); } }); }结果如下:
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 1 integer2: 2 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 3 integer2: 3 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 6 integer2: 4 08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 10
window,按照时间划分窗口,将数据发送给不同的Observable
/** * window,按照时间划分窗口,将数据发送给不同的Observable */ public void window() { Observable.interval(1, TimeUnit.SECONDS) .take(15) .window(3, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> longObservable) throws Exception { LwzLog.d(TAG, "分割..."); longObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LwzLog.d(TAG, "accept(),value: " + aLong); } }); } }); }结果如下:
08-09 10:43:34.015 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割... 08-09 10:43:35.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 0 08-09 10:43:36.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 1 08-09 10:43:37.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割... 08-09 10:43:37.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 2 08-09 10:43:38.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 3 08-09 10:43:39.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 4 08-09 10:43:40.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割... 08-09 10:43:40.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 5 08-09 10:43:41.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 6 08-09 10:43:42.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 7 08-09 10:43:43.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割... 08-09 10:43:43.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 8 08-09 10:43:44.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 9 08-09 10:43:45.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 10 08-09 10:43:46.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割... 08-09 10:43:46.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 11 08-09 10:43:47.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 12 08-09 10:43:48.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 13按时间来分割
PublishSubject的使用,PublishSubject与observable的不同在于,PublishSubject可以在创建的时候不指定数据流(无参数create()方法),并且onNext() 会通知每个观察者
/** * PublishSubject的使用,PublishSubject与observable的不同在于,PublishSubject可以在创建的时候不指 * 定数据流(无参数create()方法),并且onNext() 会通知每个观察者 */ public void publishSubject() { PublishSubject<Integer> subject = PublishSubject.create(); // 第一次订阅 subject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--1,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--1"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--1"); } }); // 灵活指定数据流 subject.onNext(1); subject.onNext(2); subject.onNext(3); // 第二次订阅 subject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--2,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--2"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--2"); } }); // 灵活指定数据流 subject.onNext(4); subject.onNext(5); subject.onNext(6); subject.onComplete(); }
结果如下:
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 1 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 2 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 3 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 4 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 4 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 5 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 5 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1 08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2
AsyncSubject的使用
/** * AsyncSubject的使用,AsyncSubject在调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存, * 在调用 onComplete() 之后只有最后一个 onNext() 会生效 */ public void asyncSubject() { AsyncSubject<Integer> asyncSubject = AsyncSubject.create(); // 第一次订阅 asyncSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--1,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--1"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--1"); } }); // 灵活指定数据流 asyncSubject.onNext(1); asyncSubject.onNext(2); asyncSubject.onNext(3); // 第二次订阅 asyncSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--2,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--2"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--2"); } }); // 灵活指定数据流 asyncSubject.onNext(4); asyncSubject.onNext(5); asyncSubject.onNext(6); asyncSubject.onComplete(); }结果如下:
08-11 02:15:40.962 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false 08-11 02:15:40.962 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false 08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6 08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1 08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6 08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2
BehaviorSubject的使用,BehaviorSubject 的最后一次 onNext() 操作会被缓存,然后在 subscribe() 后立刻推给新注册的 Observer
/** * BehaviorSubject的使用,BehaviorSubject 的最后一次 onNext() 操作会被缓存,然后在 subscribe() 后立刻推给新注册的 Observer */ public void behaviorSubject() { BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(); // 第一次订阅 behaviorSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--1,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--1"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--1"); } }); // 灵活指定数据流 behaviorSubject.onNext(1); behaviorSubject.onNext(2); behaviorSubject.onNext(3); // 第二次订阅 behaviorSubject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { LwzLog.d(TAG, "onNext()--2,integer: " + integer); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError()--2"); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()--2"); } }); // 灵活指定数据流 behaviorSubject.onNext(4); behaviorSubject.onNext(5); behaviorSubject.onNext(6); behaviorSubject.onComplete(); }结果如下:
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 1 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 2 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 3 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 3 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 4 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 4 08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 5 08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 5 08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6 08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6 08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1 08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2
Completable的使用,只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果
/** * Completable的使用,只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果 */ public void completable() { Completable.create(new MyCompletable(new Person(16, "张三"))) .subscribe(new CompletableObserver() { @Override public void onSubscribe(@NonNull Disposable d) { LwzLog.d(TAG, "onSubscribe(), isDisposed: " + d.isDisposed()); } @Override public void onComplete() { LwzLog.d(TAG, "onComplete()"); } @Override public void onError(@NonNull Throwable e) { LwzLog.d(TAG, "onError(),e: " + e.getMessage()); } }); } class MyCompletable implements CompletableOnSubscribe { private Person person; public MyCompletable(Person person) { this.person = person; } @Override public void subscribe(@NonNull CompletableEmitter e) throws Exception { if (person == null) { e.onError(new Throwable("数据为空")); return; } if (person.age < 18) { e.onError(new Throwable("未成年")); return; } e.onComplete(); } }结果如下:
08-11 02:40:14.540 7266-7266/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe(), isDisposed: false 08-11 02:40:14.540 7266-7266/com.lwz.rxjavademo D/lwz: MainHelper: onError(),e: 未成年可以修改年龄分别验证
Flowable,专门用于解决背压问题
/** * Flowable,专门用于解决背压问题 */ public void flowable() { Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { e.onNext(1); LwzLog.d(TAG, "next 1"); e.onNext(2); LwzLog.d(TAG, "next 2"); e.onNext(3); LwzLog.d(TAG, "next 3"); e.onNext(4); LwzLog.d(TAG, "next 4"); e.onComplete(); LwzLog.d(TAG, "complete"); } }, BackpressureStrategy.BUFFER); flowable.reduce(100, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { LwzLog.d(TAG, "integer: " + integer + " integer2: " + integer2); return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LwzLog.d(TAG, "integer: " + integer); } }); }结果如下:
08-11 02:59:13.794 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 100 integer2: 1 08-11 02:59:13.794 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 1 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 101 integer2: 2 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 2 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 103 integer2: 3 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 3 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 106 integer2: 4 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 4 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 110 08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: complete好文链接:http://www.jianshu.com/p/0cd258eecf60点击打开链接
相关文章推荐
- RxJava2.0 使用心得(2)
- Android 优雅的让RxJava2.0+Retrofit2.0结合使用
- Android框架学习之Retrofit(二)RxJava和Retrofit2.0的结合使用
- Retrofit2.0(二)结合Rxjava2使用
- RxJava和Retrofit2.0的结合使用
- RxJava2.0使用(一)
- Android响应式编程框架---RxJava&RxAndroid2.0使用笔记
- RxJava2.0的使用:操作符(一)
- Rxjava2.0使用笔记一
- RxJava 2.0 使用详解
- RxJava2.0的使用详解
- RxBus的使用(基于RxJava2.0)
- rxjava+rxandroid+retrofit2.0使用方法demo讲解
- 手把手教你使用 RxJava 2.0(二)
- RxJava2.0 使用心得(1)
- rxjava2.0使用教程(二)
- 【Android高级】RxJava2.0和Retrofit2.0的使用探究
- RxJava2.0初步使用
- RxJava2.0+Retrofit使用
- 天气预报项目学习总结(- ButterKnife - Retrofit 2.0(okhttp) - Rxjava - Jackson - Ormlite - Mosby简单使用总结)