您的位置:首页 > 编程语言 > Java开发

【RxJava】学习(二)---操作符

2016-11-17 14:48 330 查看
由官网对Rx的定义“An API for asynchronous programming with observable streams”可以看到,Rx本身的特点在于他的流式操作,在于他的众多的好用的操作符。本文只是挑选了一些常用的,功能明显的操作符进行说明,如果想了解全部的操作符,请参考官网原文或者中文文档

我们先列出要说到的这些RxJava的操作符[1]

创建操作

Create — 通过调用观察者的方法从头创建一个Observable

Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable

From — 将其它的对象或数据结构转换为Observable

Just — 将对象或者对象集合转换为一个会发射这些对象的Observable

变换操作

Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项

FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程

过滤操作

Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的

条件和布尔操作

Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据

辅助操作符

SubscribeOn — 指定Observable应该在哪个调度程序上执行

ObserveOn — 指定观察者观察Observable的调度程序(工作线程)

逐个来举例说明

Create

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed())
//判断是否有订阅者,如果有就发送数据,
//如果没有就不发送,以达到节约的目的
{
observer.onNext("Hello World!");
observer.onNext("Hi  World!");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("Next: " + s);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: Hello World!

Next: Hi World!

Sequence complete.

整体上用的是链式的写法,使得对数据的操作很清晰:Observerable.create.subscribe

2. Defer [2]

string0 = "Hello,Rx-Java";
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just(string0);
}
});
string0 = "hi,Rx-Java";
observable.subscribe(new Action1<String>()
//由于用的是defer
//在这个时候会去新生成一个Observable对象
{
@Override
public void call(String s) {
Log.i(TAG, s);
}
});


输出是:

hi,Rx-Java

会在订阅发生时新生成一个Observable对象,所以string0用到的 就是最新的状态 “hi,Rx-Java”

3. From

在RxJava中,from操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据(不懂没有关系,我们看下例子)

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);


输出:

0

1

2

3

4

5

Sequence complete

我们可以看到,在myObservable.subscribe时候的参数是3个,分别处理了Items里面元素的不同情况(就是说明从开始Observable.from的地方传入参数是数组,到最后输出的时候是一个Observerable),这表明,完成了从其它的对象和数据类型(这里用的是数组,数组里面可以各种情况的,比如不同的实体类…)到Observerable的转化。

4. Just

接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable。

Observable.just("A","B","C").subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i(TAG, s);
}
});


输出:

A

B

C

onCompleted

我们可以看到,ABC的输出并不是因为是数组的遍历,而是每个字母都以参数的形式传给just,每次just只是把参数按照原来的样子传出去。”A”—–>onNext(); “B”—->onNext(); “C”——>onNext();

5. Map

对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。也就是把一个Observable装换为另外一个Observable。

Observable.just("A").map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
}).subscribe(new Action1<Integer>()
//在这里我们看到传入的参数已经是Integer,
//而不是最开始的String了
{
@Override
public void call(Integer integer) {

}
});


可以看到这是一对一的转换。

6.FlatMap

将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。这是一个一对多的转换

Observable.just(test).flatMap(new Func1<Test, Observable<String>>() {
@Override
public Observable<String> call(Test test) {
return Observable.from(test.list);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s+"---FlatMap");
}
});


输出是 test.list里面的每一个元素+“—FlatMap”

FlatMap接受的参数是一个旧的Observerable(是Just(test)的返回值)经过变换之后返回的是一个新的Observerable(发射的元素是test.list里面的元素),subscriber想要接受到的是test.list里面的内容。

7. Filter

Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});


输出:

Next: 1

Next: 2

Next: 3

Sequence complete.

很简单,对数据做了一个筛选,返回值是true的数据才会被发射出去。

8. Amb

amb操作符的作用是用于比较两个Observable,然后哪一个先发射数据就只是发射这个Observable的数据,抛弃其他的Observable数据,不论他第一个是发射了onError还是onComplete

Observable<Integer> o1 = Observable.range(20, 1).delay(200, TimeUnit.MILLISECONDS);
Observable<Integer> o2 = Observable.range(10, 1).delay(100, TimeUnit.MILLISECONDS);

Observable.amb(o1, o2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "amb==" + integer.toString());
}
});


输出:

amb==10

rang(起始数,个数) 发射的都是整数,并且没有负数,可以看到o2 delay时间短,所以只发射o2

到这里简单常用的操作符已经说完了,如果想要查看全部的其他操作符请到中文官网查看

参考

[1]https://mcxiaoke.gitbooks.io/rxdocs/content/Operators.html

[2]http://liweijieok.github.io/2016/10/26/Android/%E5%BC%80%E6%BA%90%E9%A1%B9%E7%9B%AE%E5%AD%A6%E4%B9%A0/RxJavacmd/#more
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  异步 RxJava