您的位置:首页 > 编程语言 > Java开发

RxJava学习之基本使用

2016-10-30 20:55 225 查看
欢迎访问我的个人独立博客 ittiger.cn,原创文章,未经允许不得随意转载。

RxJava
现在在Android开发中越来越流行,作为一个
Android
开发者我也必须紧跟步伐学习学习
RxJava
,这篇文章就记录了
RxJava`中我认为比较常用的一些场景。

也给大伙推荐篇比较好的
RxJava
文章

* 给 Android 开发者的 RxJava 详解

RxJava基础

大家都知道
JDK
中提供了观察者模式的实现,它主要两个重要元素:

* 被观察者
Observable


* 观察者
Observer


至于
Java
中观察者模式的使用,大家可以自行Google下。

RxJava
中也有两个重要的元素:

* 被观察者(事件源)
Observable


* 观察者(事件订阅者)
Subscriber


因此
RxJava
的设计看起来也有点类似
JDK
中的观察者模式,都有被观察者和观察者。

JDK
观察者模式中当有操作需要时是由被观察者通知观察者来进行更新操作

RxJava
中是由被观察者
Observable
发出事件给观察者
Subscriber
接收,然后观察者
Subscriber
调用
noNext()
进行处理,直到调用
onComplete)()
onError()
结束

Gradle依赖

compile 'io.reactivex:rxjava:1.0.1'
compile 'io.reactivex:rxandroid:1.0.1'


基础方式创建Observable和Subscriber

//使用Observable.create()方式创建一个Observable事件源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
});
//创建一个观察者Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("RxJava" "onComplete");
}

@Override
public void onError(Throwable e) {
Log.d("RxJava" "onError");
}

@Override
public void onNext(String s) {
Log.d("RxJava" s);
}
};
//观察者订阅事件源
observable.subscribe(subscriber);


上面是
RxJava
的基础使用方式,这种方式使用起来和观察者模式还是比较像的,首先创建一个被观察者
Observable
,再创建一个观察者
Subscriber
,然后观察者订阅这个被观察者,一旦订阅之后
Observable
就会执行上面的
call(Subscriber subscriber)
方法(参数里面的
Subscriber
参数就是我们创建的观察者实例),通过该方法我们手动调用
Subscriber
方法的
onNext和onCompleted
方法。这里有个要注意的就是我们必须自己手动调用
onNext和onCompleted
方法,否则不会自己执行。

简化创建Observable和Subscriber

上面提到的
Observable
Subscriber
创建方式是
RxJava
中最基本的方式,但是上面的方式使用起来还是感觉有点繁琐,必须按部就班的来。

RxJava
中也提供了简单的创建方式,比如:
Observable.just()
方式创建
Observable
,完整是示例如下:

//创建一个只发出一个事件就结束的对象
Observable<String> observable = Observable.just("hello world");
//创建一个只关心onNext处理的subscriber
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d("RxJava" s);
}
};
//此方法有重载版本,可以传递处理onError,onComplete的Action
observable.subscribe(onNextAction);


上面的方法中使用
Observable.just()
方法可以快速的创建一个发送
hello world
事件的事件源,而如果我们只关心观察者对事件的处理,而不关心事件处理结束和事件发生错误时的处理,我们则可以创建
Action1
对象来替代
Subscriber
进行事件处理。

上面
observable.subscribe(onNextAction)
一旦订阅事件,就会自动的执行
Action1
中的
call
方法,该方法的作用等同于
Subscriber
中的
onNext
方法的作用,至于为什么一旦订阅就会自动执行
call
方法,而前面的一个例子中我们却需要手动调用
Subscriber
中的
onNext
方法,这个原因大家可以去源码实现中找答案,我就不介绍了。

当然如果你除了处理事件外,也需要对事件结束和事件错误时进行其他处理,则可以使用
observable.subscribe(Action1)
另一个重载方法
observable.subscribe(Action1,Action1,Action1)
分别接收对应
onNext
,
onCompleted
,
onError
.

Action1
中的
call
方法只能接收一个参数,
RxJava
中也提供了很多其他的几种
Action
,从
Action0
Action9
分表代表其
call
方法能接收0个参数到9个参数,另外还有一个
ActionN
其能接收N个参数。

RxJava
其实是支持链式写法 的,所以上面的写法可以适用如下的方式实现:

Observable.just("hello world")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("RxJava" s);
}
});


上面介绍了
RxJava
的基本使用,下面接着介绍
RxJava
中一些比较常用的功能函数。

事件变换map

这个
map
是干什么用的呢?我举个例子:比如说我们有一个
Observable
对象,这个对象发送的事件是一串用户密码字符串,但是
Subscriber
进行处理的时候需要的是一个包含加密后的密码串,这个时候我们就可以使用
map
操作符将一个
Observable
对象发送的事件修改成另一个事件,下面的代码里通过map将
hello world
转换成其
hashCode


Observable.just("hello world")
//map操作将一个事件变换为另外一个事件,只会影响到当前的subscriber
//此处将Observable的String事件转换成Integer事件,所以事件是可用改变的
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer hashCode) {
//这里的值就是hello world的hash值
Log.d("RxJava" s);
}
});


例子中
map(Func1<source, target>)
操作符通过
Func1<source, target>
类将
source
事件转换修改成
target
事件

通过上面的介绍可以看出map是用来变换修改
Observable
所发出的事件

Observable变换flatMap

map
是用来变换
Observable
所发出的事件,而
flatMap
就更强大,它可以将
Observable
转换成一个全新的
Observable
,依旧上例子代码

Observable.just("laohu")
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
return Observable.just(new User(s));
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});


上面的例子中通过
flatMap
操作符将一个发送
laohu
事件的
Observable
转换成一个发送
User
对象的
Observable
,该变化是完全生成一个新的
Observable


Observable.from

在我们开发过程中经常需要对一个数组或是一个集合数据进行处理,比如我通过班级编号可以查出这个班的所有学生,查询出来后需要打印出每个学生的名字,那么我们使用上面介绍的方式该怎么做呢?

List<Student> students= ...
Observable.just(students)
.subscribe(new Action1<List<Student>>() {
@Override
public void call(List<Student> students) {
for(User user : users) {
Log.d("RxJava" students.getName());
}
}
});


上面的做法中,很明显我们是在
Subscriber
中对列表进行循环打印出每个学生的名字,这种方法是不是感觉很多余,我既然都拿到列表了我干嘛还要多次一举使用
RxJava
去进行循环处理。这时使用
Observable.from
就可以解决这个问题,该方法可以将集合或数组进行循环处理,每次发送一个事件元素给
Subscriber
进行处理,在
Subscriber
中只需要针对单个
Student
进行姓名打印就可以了,改进之后的写法如下:

List<Student> students= ...
Observable.from(students)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.d("RxJava" student.getName());
}
});


改进之后的写法看着是不是很简单,
from
配合
flatMap
可以实现很多很复杂的操作,后面我们再举例

事件过滤filter

public class User  {
private String name;
private int age;

public User(String name, int age) {
this.name = name;
this.age = age;
}
...
}

String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {//名字长度大于2的年龄设为10
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});


上面的例子中,会将年龄小于等于10的用户数据过滤掉不进行处理,因此在
filter
call
方法中进行判断,年龄小于等于10的数据返回false即可将该数据过滤掉。

选取指定数量数据take()

上面的例子中如果我只想对符合条件的前两个数据进行处理该怎么做呢,这时我们可以使用
take()
操作符来实现

String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
//只取符合条件的前两个结果
.take(2)
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});


doOnNext

上面的例子中如果我们取到前两个符合条件的数据进行处理之前,我们要先进行缓存处理,这个时候我们就可以使用
doOnNext
操作符进行处理,实现如下:

String[] array = {"张三", "李四", "王麻子", "赵六"};
Observable.from(array)
//将一个事件数据变换为另一种事件输出的Observable
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String s) {
int age = 20;
if(s.length() > 2) {
age = 10;
}
return Observable.just(new User(s, age));
}
})
//将age <= 10的事件过滤掉
.filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getAge() > 10;
}
})
//只取符合条件的前两个结果
.take(2)
//在subscribe执行之前进行额外的操作,比如将数据保存到磁盘上
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
save(user);
}
})
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
Log.d("RxJava" user.getName());
}
});


线程调度

比如我们有一个同步请求网络数据的服务,在
Android
中使用
RxJava
进行处理该怎么做呢?我们可以使用
subscribeOn()
指定被观察者(事件)的运行线程,使用
observeOn()
指定观察者(订阅者)的运行线程。

Observable.just("查询条件")
.subscribeOn(Schedulers.io())//在子线程中进行查询操作
.flatMap(new Func1<String, Observable<Result>>() {
@Override
public Observable<Result> call(String whereClause) {
Result result = queryResult(whereClause);
return Observable.just(result);
}
})
.observeOn(AndroidSchedulers.mainThread())//在UI线程中处理结果
.subscribe(new Action1<Result>() {
@Override
public void call(final Result result) {
Log.d("RxJava" result.toString());
}
});


上面这些是我目前使用
RxJava
用到的一些功能函数和操作符,后面学习了其他操作符之后,我会用另外的一篇博文记录下来。

write by 老胡

2016年10月30日

本文章发表在 独立博客 ittiger.cn个人CSDN博客
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐