您的位置:首页 > 编程语言 > Java开发

RxJava 入门(三)-- 操作符简介

2017-02-28 16:38 381 查看
在 RxJava 中,如果把整个 事件流看成 是工厂的流水线,Observable 就是原料,Observer 就是我们的产品经理。产品如何交到我们的产品经理手上?其中重要的就是操作工人(Operator 操作符),它负责在 Observable 发出的事件 和 Observer 的响应之间做一些处理。

操作符分类

Creating Observables

Transforming Observables

Filtering Observables

Combining Observables(合并)

Error Handling Operators

Observable Utility Operators(辅助类)

Conditional and Boolean Operators

Mathematical and Aggregate Operators(算数)

Connectable Observable Operators(背压)

Operators to Convert Observables(连接)

自定义操作符

1. Observable的创建 – 创建性操作符

1.1 使用create( ),最基本的创建方式:

static <T> Observable<T> create(Observable.OnSubscribe<T> f)

Observable<String> observable = Observable.create(new OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("create1"); // 发射一个"create1"的String
subscriber.onNext("create2"); // 发射一个"create2"的String
subscriber.onCompleted();
// 发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法
}

});


1.2 使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:

justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"


1.3 使用from( ),遍历集合,发送每个item

List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
fromObservable = Observable.from(list);  //遍历list 每次发送一个
/** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /


1.4 使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

deferObservable = Observable.defer(new Func0<Observable<String>>() {
@Override
//注意此处的call方法没有Subscriber参数
public Observable<String> call() {
return Observable.just("deferObservable");
}});


下面这个例子应该更好理解:

Observable<Long> now = Observable.defer(() ->
Observable.just(System.currentTimeMillis()));

now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);


结果:

//用just ,则两次发送时间一样,^-^
1431444107854
1431444108858


1.5 使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器

intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次


另一种理解:

创建一个无限的计时序列,每隔一段时间发射一个数字,从 0 开始

Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
System.in.read();


结果:

Received: 0
Received: 1
Received: 2
Received: 3


如果我们不调用 unsubscribe 的话,这个序列是不会停止的。

上面的代码在最后有个 System.in.read(); 阻塞语句,这个语句是有必要的

不然的话,程序不会打印任何内容就退出了。原因是我们的操作不是阻塞的:我们创建了一个每隔一段时间就发射数据的 Observable,然后我们注册了一个 Subscriber 来打印收到的数据。这两个操作都是非阻塞的,而 发射数据的计时器是运行在另外一个线程的,但是这个线程不会阻止 JVM 结束当前的程序,所以 如果没有 System.in.read(); 这个阻塞操作,还没发射数据则程序就已经结束运行了

1.6 使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常

rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14


1.7 使用timer( ),两个重载函数

重点内容创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法

timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒后发射一个值


创建了一个 Observable, 该 Observable 等待 1S,然后发射数据 0 ,然后就结束了

Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);


输出结果:

Received: 0
Completed


另外一个示例是,先等待一段时间,然后开始按照间隔的时间一直发射数据:

Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);


输出结果:

Received: 0
Received: 1
Received: 2


上面的示例,先等待 2秒,然后每隔一秒开始发射数据

1.8 使用repeat( ),创建一个重复发射特定数据的Observable:

repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次


2. Observer的创建

// 2.创建观察者Observer
Observer<Student> observer = new Observer<Student>() {

@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable arg0) {
System.out.println("onError");
}

@Override
public void onNext(Student arg0) {
System.out.println("onNext = " + arg0);
}
};


3. 关联Observable 和 Observer 形成 RxJava

有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子

// 3.被观察者订阅观察者
observable
// 建议在这修改数据
.map(new Func1<String, String>() {
// 第一个参数决定 call方法类型,第二个参数决定返回值类型
@Override
public String call(String arg0) {
return arg0 + "汤圆";
}
})
.subscribe(observer);


observer
onNext
方法将会依次收到来自
observable
的数据”just1”、”just2”,另外,如果你不在意数据是否接收完或者是否出现错误,即不需要
Observer
onCompleted()
onError()
方法,可使用
Action1
subscribe()
支持将
Action1
作为参数传入,
RxJava
将会调用它的call方法来接收数据,代码如下:

observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {

LogUtil.log(s);
}});


4. talk is cheaper without code

4.1 获取SD卡中所有以.jpg结尾的文件

public void getFiles() {
String basePath = Environment.getExternalStorageDirectory().getPath();
File rootFile = new File(basePath);

Observable.just(rootFile)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return listFiles(file);
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".jpg");
}
})
.map(new Func1<File, String>() {
@Override
public String call(File file) {
return file.getName();
}
})
.toList()
.subscribe(new Observer<List<String>>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(List<String> lists) {
Log.e("test", "onNext: size =" + lists.size());
Log.e("test", "onNext: " + lists);
}
});
}
//递归判断是不是文件夹
private Observable<File> listFiles(File file){
if (file.isDirectory()){
return Observable.from(file.listFiles()).flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return listFiles(file);
}
});
}else {
return Observable.just(file);
}
}


4.2 从数据库的用户表查找用户数据

Observable.create(new Observable.OnSubscribe<List<User>>() {
@Override
public void call(Subscriber<? super List<User>> subscriber) {
List<User> userList = null;
···
//从数据库获取用户表数据并赋给userList
···
subscriber.onNext(userList);
}
}).subscribe(new Action1<List<User>>() {
@Override
public void call(List<User> users) {

//获取到用户信息列表
}
});


但是,领导突然又不想要所有用户了,只要名字叫“小明”的用户,行吧,领导最大,我改(假设名字唯一):

Observable.create(new Observable.OnSubscribe<List<User>>() {
@Override
public void call(Subscriber<? super List<User>> subscriber) {
List<User> userList = null;
···
//从数据库获取用户表数据并赋给userList
···
subscriber.onNext(userList);
}
}).flatMap(new Func1<List<User>, Observable<User>>() {
@Override
public Observable<User> call(List<User> users) {
return Observable.from(users);
}
}).filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getName().equals("小明");
}
}).subscribe(new Action1<User>() {
@Override
public void call(User user) {
//拿到谜之小明的数据
}
});


搞定,这时候领导又说,我不要小明了,我要小明的爸爸的数据,(坑爹啊~~),我继续改:

Observable.create(new Observable.OnSubscribe<List<User>>() {
@Override
public void call(Subscriber<? super List<User>> subscriber) {
List<User> userList = null;
···
//从数据库获取用户表数据并赋给userList
···
subscriber.onNext(userList);
}
}).flatMap(new Func1<List<User>, Observable<User>>() {
@Override
public Observable<User> call(List<User> users) {
return Observable.from(users);
}
}).filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getName().equals("小明");
}
}).map(new Func1<User, User>() {
@Override
public User call(User user) {
//根据小明的数据user从数据库查找出小明的父亲user2
return user2;
}
}).subscribe(new Action1<User>() {
@Override
public void call(User user2) {
//拿到谜之小明的爸爸的数据
}
});


RxJava在需求不断变更、逻辑愈加复杂的情况下,依旧可以保持代码简洁、可阅读性强的一面,没有各种回调,也没有谜之缩进!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: