您的位置:首页 > 编程语言 > Java开发

RxJava2.0的使用

2017-08-07 09:35 351 查看

添加依赖:

compile 'io.reactivex.rxjava2:rxjava:2.1.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

bean:person(用于以下demo的使用)

public class Person {
public int age;
public String name;

@Override
public String toString() {
return "name: " + name + " age: " + age;
}
}


基本使用:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("第一个");
LwzLog.d(TAG, "subscribe(),第一个");
e.onNext("第二个");
LwzLog.d(TAG, "subscribe(),第二个");
e.onNext("第三个");
LwzLog.d(TAG, "subscribe(),第三个");
e.onComplete();
}
}).subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onNext(@NonNull String s) {
LwzLog.d(TAG, "onNext()," + s);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError(),");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete(),");
}
});

just操作,直接指定一个序列事件

Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : onNext : " + s + "\n");
Log.e(TAG,"accept : onNext : " + s + "\n" );
}
});

Consumer直接处理事件

Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : onNext : " + s + "\n");
Log.e(TAG,"accept : onNext : " + s + "\n" );
}
});
可以看到Consumer取代了onNext(),并且隐藏了onError(),onComplete()等可能不需要关注的方法,代码看起来更简洁。

Single的使用

/**
* Single的使用
*/
public void single() {
Single.just(1)// 只接收一个参数
.subscribe(new SingleObserver<Integer>() {// SingleObserver是Single特定的观察者
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onSuccess(@NonNull Integer integer) {
LwzLog.d(TAG, "onSuccess(),integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()");
}
});
}


zip操作,将两个或多个事件合并

/**
* zip,合并事件,取两个观察者事件合并,生成另个事件
*/
public void zip() {
// 第一个事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(21);
LwzLog.d(TAG, "onNext(),21");
e.onNext(22);
LwzLog.d(TAG, "onNext(),22");
e.onNext(23);
LwzLog.d(TAG, "onNext(),23");
e.onComplete();
LwzLog.d(TAG, "onComplete(),observable");
}
});

// 第二个事件
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("张三");
LwzLog.d(TAG, "onNext(),张三");
e.onNext("李四");
LwzLog.d(TAG, "onNext(),李四");
e.onNext("王二");
LwzLog.d(TAG, "onNext(),王二");
e.onNext("麻子");
LwzLog.d(TAG, "onNext(),麻子");
e.onNext("王五");
LwzLog.d(TAG, "onNext(),王五");
e.onNext("赵六");
LwzLog.d(TAG, "onNext(),赵六");
e.onComplete();
LwzLog.d(TAG, "onComplete(),observable1");
}
});

// 合并后的事件
Observable<Person> observable2 = Observable.zip(observable, observable1, new BiFunction<Integer, String, Person>() {
@Override
public Person apply(@NonNull Integer age, @NonNull String name) throws Exception {
// 合并规则
Person person = new Person();
person.age = age;
person.name = name;
LwzLog.d(TAG, "apply(),person: " + person.toString());
return person;
}
});

// 合并后的处理。。。
observable2.subscribe(new Observer<Person>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onNext(@NonNull Person person) {
LwzLog.d(TAG, "onNext(),person: " + person);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete(),");
}
});

}
如果要合并多个,只需要多传入几个observable参数,将BiFunction改为Function3、Function4等(后面的数字取决于observable的个数)

map,将上一个事件按照函数规则转换成另一个事件

/**
* map,对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化
*/
public void map() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(21);
LwzLog.d(TAG, "next: " + 21);
e.onNext(22);
LwzLog.d(TAG, "next: " + 22);
e.onComplete();
LwzLog.d(TAG, "complete ");
}
});

Observable<Person> observable1 = observable.map(new Function<Integer, Person>() {
int position = 1;

@Override
public Person apply(@NonNull Integer num) throws Exception {
Person person = new Person();
person.age = num;
person.name = "name_" + position++;
LwzLog.d(TAG, "apply(),person: " + person.toString());
return person;
}
});

observable1.subscribe(new Observer<Person>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onNext(@NonNull Person person) {
LwzLog.d(TAG, "onNext(),person: " + person.toString());
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()");
}
});
}


flatMap,将上游一个发送事件变换成多个发送事件

/**
* flatMap,将上游一个发送事件变换成多个发送事件
*/
public void flatMap() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
LwzLog.d(TAG, "next: " + 1);
e.onNext(2);
LwzLog.d(TAG, "next: " + 2);
e.onNext(3);
LwzLog.d(TAG, "next: " + 3);
e.onComplete();
LwzLog.d(TAG, "complete: ");
}
}).flatMap(new Function<Integer, ObservableSource<Person>>() {
@Override
public ObservableSource<Person> apply(@NonNull Integer integer) throws Exception {
LwzLog.d(TAG, "apply(): integer: " + integer);
List<Person> list = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Person person = new Person();
person.age = integer;
person.name = "name_" + (integer + i);
list.add(person);
}
// 延时发送
return Observable.fromIterable(list).delay(1000, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Person>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onNext(@NonNull Person person) {
LwzLog.d(TAG, "onNext(),person: " + person);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()");
}
});
}


结果如下:
08-07 03:52:55.297 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()
08-07 03:52:55.298 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 1
08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 1
08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 2
08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 2
08-07 03:52:55.300 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 3
08-07 03:52:55.301 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: next: 3
08-07 03:52:55.301 16304-16921/com.lwz.rxjavademo D/lwz: MainHelper: complete:
08-07 03:52:56.301 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_1 age: 1
08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_2 age: 1
08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_3 age: 3
08-07 03:52:56.302 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_4 age: 3
08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_2 age: 2
08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),person: name: name_3 age: 2
08-07 03:52:56.303 16304-16304/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()
可以看出flatMap并不能保证事件的顺序

concatMap,作用和flatMap几户一样,唯一的区别是它能保证事件的顺序

/**
* concatMap作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序
*/
public void concatMap() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
LwzLog.d(TAG, "next: " + 1);
e.onNext(2);
LwzLog.d(TAG, "next: " + 2);
e.onNext(3);
LwzLog.d(TAG, "next: " + 3);
e.onComplete();
LwzLog.d(TAG, "complete: ");
}
}).concatMap(new Function<Integer, ObservableSource<Person>>() {
@Override
public ObservableSource<Person> apply(@NonNull Integer integer) throws Exception {
LwzLog.d(TAG, "apply(): integer: " + integer);
List<Person> list = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Person person = new Person();
person.age = integer;
person.name = "name_" + (integer + i);
list.add(person);
}
// 延时发送
return Observable.fromIterable(list).delay(1000, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<Person>() {
@Override
public void accept(Person person) throws Exception {
LwzLog.d(TAG, "accept(),person: " + person);
}
});
}
结果如下:
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 1
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 1
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 2
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: next: 3
08-07 04:07:44.512 10393-10393/com.lwz.rxjavademo D/lwz: MainHelper: complete:
08-07 04:07:45.513 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_1 age: 1
08-07 04:07:45.513 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 1
08-07 04:07:45.514 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 2
08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 2
08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_3 age: 2
08-07 04:07:46.515 10393-10548/com.lwz.rxjavademo D/lwz: MainHelper: apply(): integer: 3
08-07 04:07:47.517 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_3 age: 3
08-07 04:07:47.517 10393-10584/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_4 age: 3

doOnNext,让订阅者在接收到数据前干点事情的操作符

Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n");
Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("doOnNext :" + integer + "\n");
Log.e(TAG, "doOnNext :" + integer + "\n");
}
});

filter,过滤,取符合特定规则的数据

/**
* filter,过滤,取符合特定规则的数据
*/
public void filter() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Person person = new Person();
person.age = i;
person.name = "name_" + i;
list.add(person);
}
Observable.fromIterable(list)
.filter(new Predicate<Person>() {
@Override
public boolean test(@NonNull Person person) throws Exception {
return person.age % 2 == 0;
}
})
.subscribe(new Consumer<Person>() {
@Override
public void accept(Person person) throws Exception {
LwzLog.d(TAG, "accept(),person: " + person.toString());
}
});
}
结果如下:
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_0 age: 0
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_2 age: 2
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_4 age: 4
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_6 age: 6
08-08 07:50:58.945 11544-11544/com.lwz.rxjavademo D/lwz: MainHelper: accept(),person: name: name_8 age: 8


skip,代表跳过多少个数目的事件再开始接收

/**
* skip,代表跳过多少个数目的事件再开始接收
*/
public void skip() {
Observable.just("--", "无名者", "--", "张三", "李四", "王五", "赵六")
.skip(3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LwzLog.d(TAG, "string: " + s);
}
});
}
结果如下:
08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 张三
08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 李四
08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 王五
08-08 08:04:38.841 3469-3469/com.lwz.rxjavademo D/lwz: MainHelper: string: 赵六

take,用于指定订阅者最多收到多少数据

/**
* take,用于指定订阅者最多收到多少数据
*/
public void take() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("张三");
LwzLog.d(TAG, "next: 张三");
e.onNext("李四");
LwzLog.d(TAG, "next: 李四");
e.onNext("王五");
LwzLog.d(TAG, "next: 王五");
e.onNext("赵六");
LwzLog.d(TAG, "next: 赵六");
e.onComplete();
LwzLog.d(TAG, "complete");
}
}).take(2)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LwzLog.d(TAG, "string: " + s);
}
});
}
结果如下:
08-08 08:16:07.791 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: string: 张三
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 张三
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: string: 李四
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 李四
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 王五
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: next: 赵六
08-08 08:16:07.792 23662-23662/com.lwz.rxjavademo D/lwz: MainHelper: complete
由此可见,当事件超过指定事件时,超出的事件便不再发射,但是onNext会被执行

timer,替换java中的handler延时执行语句

/**
* timer,替换java中的handler延时执行语句
*/
public void timer() {
LwzLog.d(TAG, "start time: " + Util.getNowStrTime());
Observable.timer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LwzLog.d(TAG, Util.getNowStrTime() + " " + Thread.currentThread().getName());
}
});
}
如果没有指定线程,默认执行在一个新的线程

interval,间隔执行,相当于timer+handler的定时器

/**
* interval,替换timer+handler的定时执行操作
*/
public void interval() {
LwzLog.d(TAG, "start time: " + Util.getNowStrTime());
Observable.interval(1, 2, TimeUnit.SECONDS)// 延时1秒,每隔2秒执行一次
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())// 默认新开一个线程,这里指定为主线程执行
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LwzLog.d(TAG, Util.getNowStrTime() + " " + Thread.currentThread().getName());
}
});
}


concat,连接操作符,可接受Observable的可变参数,或者Observable的集合

/**
* concat,连接操作符,可接受Observable的可变参数,或者Observable的集合
*/
public void concat() {
Observable.concat(Observable.just(1, 2, 3), Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(4);
LwzLog.d(TAG, "next: 4");
e.onComplete();
LwzLog.d(TAG, "complete");
}
})).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept(),integer: " + integer);
}
});
}
结果如下:
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 2
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 4
08-09 02:40:15.518 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: next: 4
08-09 02:40:15.519 12425-12425/com.lwz.rxjavademo D/lwz: MainHelper: complete


distinct,去重操作符

/**
* distinct,去重操作符
*/
public void distinct() {
Observable.just("1", "1", "2", "2", "3", "4", "5")
.distinct()
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LwzLog.d(TAG, "accept(),value: " + s);
}
});
}
结果如下:
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 1
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 2
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 3
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 4
08-09 02:48:21.062 26264-26264/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 5

buffer,将一个事件序列分成若干个事件序列

/**
* buffer,分流操作符
*/
public void buffer() {
Observable.just(1, 2, 3, 4, 5, 6, 7)
.buffer(4, 3)// 分别表示单个序列的最大长度,跳过多少个作为下一个序列的起始位置
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
LwzLog.d(TAG, "accept(),list size: " + integers.size());
String temp = "";
for (int num : integers) {
temp += num;
}
LwzLog.d(TAG, "accept(),integers: " + temp);
}
});
}
结果如下:
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 4
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 1234
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 4
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 4567
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),list size: 1
08-09 03:13:53.370 7283-7283/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integers: 7
可以看出一个事件序列被等分成指定长度的若干个序列(如果达不到指定长度则单独为一列),并按跳过指定的条目作为下一个序列的起始位置

debounce,去抖动,过滤掉发射速率过快的数据项

/**
* debounce,去抖动,过滤掉发射速率过快的数据项
*/
public void debounce() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(300);
e.onNext(2);
Thread.sleep(600);
e.onNext(3);
Thread.sleep(200);
e.onNext(4);
Thread.sleep(500);
e.onNext(5);
Thread.sleep(800);
e.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS)// 过滤掉发射速率大于500ms的事件
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept,integer: " + integer);
}
});
}
结果如下:
08-09 08:19:47.111 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 2
08-09 08:19:47.912 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 4
08-09 08:19:48.413 4087-4141/com.lwz.rxjavademo D/lwz: MainHelper: accept,integer: 5
可以看到,发射速率大于500ms的都被过滤掉了,注意等于临界值的没有被过滤掉

defer,每次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable

/**
* defer,每次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable
*/
public void defer() {
Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()");
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext(),integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()");
}
});
}
结果如下:
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 1
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 2
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onNext(),integer: 3
08-09 08:38:29.185 29206-29206/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()


last,取最后一个值

/**
* last,取最后一个值
*/
public void last() {
Observable.just(1, 2, 3)
.last(4)// 默认值,在没有值的时候生效
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept(),integer: " + integer);
}
});
}
结果如下
08-09 09:05:21.017 17921-17921/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3

merge,将多个Observable合起来,接受可变参数,也支持使用迭代器集合

/**
* merge,将多个Observable合起来,接受可变参数,也支持使用迭代器集合
*/
public void merge() {
Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept(),integer: " + integer);
}
});
}
结果如下:
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 2
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 4
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 5
08-09 09:26:15.183 6480-6480/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6

reduce,就是一次用一个方法处理一个值,可以有一个seed作为初始值

public void reduce() {
Observable.just(1, 2, 3)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
LwzLog.d(TAG, "apply(),integer: " + integer + " integer2: " + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept(),integer: " + integer);
}
});
}
结果如下:
08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 1 integer2: 2
08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 3 integer2: 3
08-09 09:48:09.444 775-775/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6

scan,和reducce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出

/**
* scan,和reducce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出
*/
public void scan() {
Observable.just(1, 2, 3, 4)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
LwzLog.d(TAG, "apply(),integer: " + integer + " integer2: " + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "accept(),integer: " + integer);
}
});
}
结果如下:
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 1
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 1 integer2: 2
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 3
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 3 integer2: 3
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 6
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: apply(),integer: 6 integer2: 4
08-09 10:21:07.639 29203-29203/com.lwz.rxjavademo D/lwz: MainHelper: accept(),integer: 10


window,按照时间划分窗口,将数据发送给不同的Observable

/**
* window,按照时间划分窗口,将数据发送给不同的Observable
*/
public void window() {
Observable.interval(1, TimeUnit.SECONDS)
.take(15)
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> longObservable) throws Exception {
LwzLog.d(TAG, "分割...");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LwzLog.d(TAG, "accept(),value: " + aLong);
}
});
}
});
}
结果如下:
08-09 10:43:34.015 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割...
08-09 10:43:35.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 0
08-09 10:43:36.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 1
08-09 10:43:37.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割...
08-09 10:43:37.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 2
08-09 10:43:38.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 3
08-09 10:43:39.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 4
08-09 10:43:40.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割...
08-09 10:43:40.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 5
08-09 10:43:41.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 6
08-09 10:43:42.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 7
08-09 10:43:43.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割...
08-09 10:43:43.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 8
08-09 10:43:44.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 9
08-09 10:43:45.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 10
08-09 10:43:46.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: 分割...
08-09 10:43:46.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 11
08-09 10:43:47.016 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 12
08-09 10:43:48.017 3839-3839/com.lwz.rxjavademo D/lwz: MainHelper: accept(),value: 13
按时间来分割

PublishSubject的使用,PublishSubject与observable的不同在于,PublishSubject可以在创建的时候不指定数据流(无参数create()方法),并且onNext() 会通知每个观察者

/**
* PublishSubject的使用,PublishSubject与observable的不同在于,PublishSubject可以在创建的时候不指
* 定数据流(无参数create()方法),并且onNext() 会通知每个观察者
*/
public void publishSubject() {
PublishSubject<Integer> subject = PublishSubject.create();

// 第一次订阅
subject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--1,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--1");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--1");
}
});

// 灵活指定数据流
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);

// 第二次订阅
subject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--2,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--2");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--2");
}
});

// 灵活指定数据流
subject.onNext(4);
subject.onNext(5);
subject.onNext(6);
subject.onComplete();
}


结果如下:
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 1
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 2
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 3
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 4
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 4
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 5
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 5
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1
08-11 02:02:43.165 5377-5377/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2

AsyncSubject的使用

/**
* AsyncSubject的使用,AsyncSubject在调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存,
* 在调用 onComplete() 之后只有最后一个 onNext() 会生效
*/
public void asyncSubject() {
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

// 第一次订阅
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--1,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--1");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--1");
}
});

// 灵活指定数据流
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onNext(3);

// 第二次订阅
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--2,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--2");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--2");
}
});

// 灵活指定数据流
asyncSubject.onNext(4);
asyncSubject.onNext(5);
asyncSubject.onNext(6);
asyncSubject.onComplete();
}
结果如下:
08-11 02:15:40.962 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false
08-11 02:15:40.962 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false
08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6
08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1
08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6
08-11 02:15:40.963 23575-23575/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2

BehaviorSubject的使用,BehaviorSubject 的最后一次 onNext() 操作会被缓存,然后在 subscribe() 后立刻推给新注册的 Observer

/**
* BehaviorSubject的使用,BehaviorSubject 的最后一次 onNext() 操作会被缓存,然后在 subscribe() 后立刻推给新注册的 Observer
*/
public void behaviorSubject() {
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();

// 第一次订阅
behaviorSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()--1,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--1,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--1");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--1");
}
});

// 灵活指定数据流
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.onNext(3);

// 第二次订阅
behaviorSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe()-2,isDisposed:" + d.isDisposed());
}

@Override
public void onNext(@NonNull Integer integer) {
LwzLog.d(TAG, "onNext()--2,integer: " + integer);
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError()--2");
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()--2");
}
});

// 灵活指定数据流
behaviorSubject.onNext(4);
behaviorSubject.onNext(5);
behaviorSubject.onNext(6);
behaviorSubject.onComplete();
}
结果如下:
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()--1,isDisposed:false
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 1
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 2
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 3
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe()-2,isDisposed:false
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 3
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 4
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 4
08-11 02:23:59.065 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 5
08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 5
08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--1,integer: 6
08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onNext()--2,integer: 6
08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--1
08-11 02:23:59.066 5589-5589/com.lwz.rxjavademo D/lwz: MainHelper: onComplete()--2

Completable的使用,只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果

/**
* Completable的使用,只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果
*/
public void completable() {
Completable.create(new MyCompletable(new Person(16, "张三")))
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LwzLog.d(TAG, "onSubscribe(), isDisposed: " + d.isDisposed());
}

@Override
public void onComplete() {
LwzLog.d(TAG, "onComplete()");
}

@Override
public void onError(@NonNull Throwable e) {
LwzLog.d(TAG, "onError(),e: " + e.getMessage());
}
});
}

class MyCompletable implements CompletableOnSubscribe {

private Person person;

public MyCompletable(Person person) {
this.person = person;
}

@Override
public void subscribe(@NonNull CompletableEmitter e) throws Exception {
if (person == null) {
e.onError(new Throwable("数据为空"));
return;
}
if (person.age < 18) {
e.onError(new Throwable("未成年"));
return;
}
e.onComplete();
}
}
结果如下:
08-11 02:40:14.540 7266-7266/com.lwz.rxjavademo D/lwz: MainHelper: onSubscribe(), isDisposed: false
08-11 02:40:14.540 7266-7266/com.lwz.rxjavademo D/lwz: MainHelper: onError(),e: 未成年
可以修改年龄分别验证

Flowable,专门用于解决背压问题

/**
* Flowable,专门用于解决背压问题
*/
public void flowable() {
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
LwzLog.d(TAG, "next 1");
e.onNext(2);
LwzLog.d(TAG, "next 2");
e.onNext(3);
LwzLog.d(TAG, "next 3");
e.onNext(4);
LwzLog.d(TAG, "next 4");
e.onComplete();
LwzLog.d(TAG, "complete");
}
}, BackpressureStrategy.BUFFER);
flowable.reduce(100, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
LwzLog.d(TAG, "integer: " + integer + " integer2: " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LwzLog.d(TAG, "integer: " + integer);
}
});
}
结果如下:
08-11 02:59:13.794 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 100 integer2: 1
08-11 02:59:13.794 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 1
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 101 integer2: 2
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 2
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 103 integer2: 3
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 3
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 106 integer2: 4
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: next 4
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: integer: 110
08-11 02:59:13.795 8047-8047/com.lwz.rxjavademo D/lwz: MainHelper: complete
好文链接:http://www.jianshu.com/p/0cd258eecf60点击打开链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RxJava