您的位置:首页 > 编程语言 > Java开发

RxJava操作符(四)Combining

2015-11-04 23:40 615 查看
RxJava操作符(四)Combining

原文链接 http://blog.chinaunix.net/uid-20771867-id-5197584.html
上一篇文章中我们了解了如何对数据进行过滤,在这篇文章里我们来了解一下如何组装多个Observable的数据。

一、CombineLatest

CombineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:

1.所有的Observable都发射过数据。

2.满足条件1的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。




Rxjava实现CombineLast操作符可以让我们直接将组装的Observable作为参数传值,也可以将所有的Observable装在一个List里面穿进去。

下面我们创建几个Observable对象,分别直接传值和使用List传值将其组装起来

点击(此处)折叠或打开

private Observable<Integer> createObserver(int index) {

return Observable.create(new Observable.OnSubscribe<Integer>() {

@Override

public void call(Subscriber<? super Integer> subscriber) {

for (int i = 1; i < 6; i++) {

subscriber.onNext(i * index);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}).subscribeOn(Schedulers.newThread());

}

private Observable<Integer> combineLatestObserver() {

return Observable.combineLatest(createObserver(1), createObserver(2), (num1, num2) -> {

log("left:" + num1 + "
right:" + num2);

return num1 + num2;

});

}

List<Observable<Integer>> list = new ArrayList<>();

private Observable<Integer> combineListObserver() {

for (int i = 1; i < 5; i++) {

list.add(createObserver(i));

}

return Observable.combineLatest(list, args -> {

int temp = 0;

for (Object i : args) {

log(i);

temp += (Integer) i;

}

return temp;

});

}

对其进行订阅

点击(此处)折叠或打开

mLButton.setText("combineList");

mLButton.setOnClickListener(e -> combineListObserver().subscribe(i -> log("combineList:" + i)));

mRButton.setText("CombineLatest");

mRButton.setOnClickListener(e -> combineLatestObserver().subscribe(i -> log("CombineLatest:" + i)));

运行结果如下





二、Join

Join操作符根据时间窗口来组合两个Observable发射的数据,每个Observable都有一个自己的时间窗口,要组合的时候,在这个时间窗口内的数据都有有效的,可以拿来组合。

Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。





使用join操作符需要4个参数,分别是:

1.源Observable所要组合的目标Observable

2.一个函数,就收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期

3.一个函数,就收从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期

4.一个函数,接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。



下面我们使用join和groupJoin操作符分别来组合两个Observable对象

点击(此处)折叠或打开

private Observable<String> createObserver() {

return Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

for (int i = 1; i < 5; i++) {

subscriber.onNext("Right-" + i);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}).subscribeOn(Schedulers.newThread());

}

private Observable<String> joinObserver() {

return Observable.just("Left-").join(createObserver(),

integer -> Observable.timer(3000, TimeUnit.MILLISECONDS),

integer -> Observable.timer(2000, TimeUnit.MILLISECONDS),

(i, j) -> i + j

);

}

private Observable<Observable<String>> groupJoinObserver() {

return Observable.just("Left-")

.groupJoin(createObserver(),

s -> Observable.timer(3000, TimeUnit.MILLISECONDS),

s -> Observable.timer(2000, TimeUnit.MILLISECONDS),

(s, stringObservable) -> stringObservable.map(str -> s + str));

}

分别进行订阅

点击(此处)折叠或打开

mLButton.setText("join");

mLButton.setOnClickListener(e -> joinObserver().subscribe(i -> log("join:" + i + "\n")));

mRButton.setText("groupJoin");

mRButton.setOnClickListener(e -> groupJoinObserver().subscribe(i -> i.subscribe(j -> log("groupJoin:" + j + "\n"))));

运行结果如下,可以看到虽然目标Observable发射了4个数据,但是源Observable只发射了一个有效期为3秒的数据,所以最终的组合结果也只有3个数据。



三、Merege

Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。但是其发射的数据有可能是交错的,如果想要没有交错,可以使用concat操作符。

当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。





下面我们分别使用merge和mergeDelayError操作符来进行merge操作。

点击(此处)折叠或打开

private Observable<Integer> mergeObserver() {

return Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6));

}

private Observable<Integer> mergeDelayErrorObserver() {

return Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe<Integer>() {

@Override

public void call(Subscriber<? super Integer> subscriber) {

for (int i = 0; i < 5; i++) {

if (i == 3) {

subscriber.onError(new Throwable("error"));

}

subscriber.onNext(i);

}

}

}), Observable.create(new Observable.OnSubscribe<Integer>() {

@Override

public void call(Subscriber<? super Integer> subscriber) {

for (int i = 0; i < 5; i++) {

subscriber.onNext(5 + i);

}

subscriber.onCompleted();

}

}));

}

分别对其订阅



点击(此处)折叠或打开

mLButton.setText("Merge");

mLButton.setOnClickListener(e -> mergeObserver().subscribe(i -> log("Merge:" + i)));

mRButton.setText("mergeDelayError");

mRButton.setOnClickListener(e -> mergeDelayErrorObserver().subscribe(new Subscriber<Integer>() {

@Override

public void onCompleted() {

log("onCompleted");

}

@Override

public void onError(Throwable e) {

log("mergeDelayError:" + e);

}

@Override

public void onNext(Integer integer) {

log("mergeDelayError:" + integer);

}

}));

运行结果如下。



四、StartWith、Switch

StartWith操作符会在源Observable发射的数据前面插上一些数据。不仅仅只可以插入一些数据,还可以将Iterable和Observable插入进入。如果插入的是Observable,则这个Observable发射的数据会插入到

源Observable发射数据的前面。



switch操作符在Rxjava上的实现为switchOnNext,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。

需要注意的就是,如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新

的小Observable所发射的数据。可以看示意图中的***圆圈就被丢弃了。



下面使用startWith和switchOnNext操作符来组合两个Observable

点击(此处)折叠或打开

private Observable<Integer> startWithObserver() {

return Observable.just(1, 2, 3).startWith(-1, 0);

}

private Observable<String> switchObserver() {

return Observable.switchOnNext(Observable.create(

new Observable.OnSubscribe<Observable<String>>() {

@Override

public void call(Subscriber<? super Observable<String>> subscriber) {

for (int i = 1; i < 3; i++) {

subscriber.onNext(createObserver(i));

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

));

}

private Observable<String> createObserver(int index) {

return Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

for (int i = 1; i < 5; i++) {

subscriber.onNext(index + "-" + i);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}).subscribeOn(Schedulers.newThread());

}

分别进行订阅

点击(此处)折叠或打开

mLButton.setText("StartWith");

mLButton.setOnClickListener(e -> startWithObserver().subscribe(i -> log("StartWith:" + i)));

mRButton.setText("switch");

mRButton.setOnClickListener(e -> switchObserver().subscribe(i -> log("switch:" + i)));

运行结果如下,可以看到startwith将-1和0插入到前面。使用siwtch的时候第一个小Observable只发射出了两个数据,第二个小Observable就被源Observable发射出来了,所以其接下来的两个数据被丢弃。



五、Zip

Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

Rxjava实现了zip和zipWith两个操作符。



下面我们使用zip和zipWith操作符来组合数据



点击(此处)折叠或打开

private Observable<String> zipWithObserver() {

return createObserver(2).zipWith(createObserver(3), (s, s2) -> s + "-" + s2);

}

private Observable<String> zipWithIterableObserver() {

return Observable.zip(createObserver(2), createObserver(3), createObserver(4), (s, s2, s3) -> s + "-" + s2 + "-" + s3);

}

private Observable<String> createObserver(int index) {

return Observable.create(new Observable.OnSubscribe<String>() {

@Override

public void call(Subscriber<? super String> subscriber) {

for (int i = 1; i <= index; i++) {

log("emitted:" + index + "-" + i);

subscriber.onNext(index + "-" + i);

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}).subscribeOn(Schedulers.newThread());

}

分别进行订阅

点击(此处)折叠或打开

mLButton.setText("zipWith");

mLButton.setOnClickListener(e -> zipWithObserver().subscribe(i -> log("zipWith:" + i + "\n")));

mRButton.setText("zip");

mRButton.setOnClickListener(e -> zipWithIterableObserver().subscribe(i -> log("zip:" + i + "\n")));

运行结果如下,可以看到,最终都发射出了两个数据,因为createObserver(2)所创建的Observable只会发射两个数据,所以其他Observable多余发射的数据都被丢弃了。





Combning的操作符就到这了,本文中的源码见https://github.com/Chaoba/RxJavaDemo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: