您的位置:首页 > 编程语言 > Java开发

RxJava使用入门

2017-02-13 16:39 330 查看
一、基本概念

Observable: 发射源,在观察者模式中成为“被观察者”

Observer: 接收源,“观察者”可接收Observable Subject发射的数据

Subject:Subject是一个比较特殊的对象,既可以是发射源也可以是接收源

Subscriber:“订阅者”,实现了Observer接口,比Observer多了一个最重要的方法unsubscribe();用于取消订阅,建议使用Subscriber作为接受者

Subscription:Observable调用subscribe()方法返回的对象,同样可以取消订阅。

Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1、….Action9等,Action1封装了含有一个参数的call()方法,call(T t),以此类推;

Func0:与Action0类似,也有call()方法,但有返回值

二、使用过程简介:

创建一个数据发射源,用于发射数据:

Observable<String> sender = Observable.create(new Observable.OnSubscrible<String>(){
@Override
public void call(Subscriber<? super String> subscriber){
subscriber.onNext("hello world")
}
})


创建一个数据接收源

Observer<String> receiver = new Observer<String>() {

@Override
public void onCompleted() {

//数据接收完成时调用
}

@Override
public void onError(Throwable e) {

//发生错误调用
}

@Override
public void onNext(String s) {

//正常接收数据调用
System.out.print(s);  //将接收到来自sender的问候"Hi,Weavey!"
}
};


然后通过sender.subscriber(receiver) 把被观察者和观察者关联起来,send发射“hello world”将会被reveiver接收到,这是最简单的使用过程,Rxjava让异步,观察者模式变得简单起来

三、Rxjava在项目中的使用

Observable的创建:

1、使用create():

normalObserable = Observable.create(new Observable.OnSubcribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("create1"); //发射一个"create1"的String
subscriber.onNext("create2"); //发射一个"create2"的String
subscriber.onCompleted();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法
}})


2、使用just();将为你创建一个Observable并自动为你调用onNext()发射数据:

justObservable = Observable.just("just1","just2");//依次发送"just1","just2";


3、使用from(),遍历集合,发送每个item:

List<String> list = new ArrayList();
list.add("from1");
list.add("from2");
list.add("from3");
fromObservable = Observabel.from(list);//遍历list,每次发送一个,---just方法也可以传list但是发送的是整个list对象,from发送的是list的一个item


4、使用defer(),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

deferObservable = Observable.defer(new Func0<Observable<String>()>{
@Override
//注意此处的call方法没有Subscriber参数
public Observable<String> call() {
return Observable.just("deferObservable");
})


5、使用interval(),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器

intervalObservable = Observable.interval(1,TimeUnit.SECONDS);//每隔一秒发送一次


6、使用range() ,创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0,则不发送,负数抛异常

rangeObservable = Observable.range(10,5)//将发送整数10,11,12,13,14


7、使用Timer(),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于handler.postDelay();

timeObservable = Observabel.timer(3,TimeUnit.SECONDS); //三秒后发射一个值


8、使用repeat(),创建一个重复发射特定数据的Observable

repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射三次


Observer的创建

mObserver = new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.log("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
LogUtil.log(s);
}};


有了Observable和Observer,就好办了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子

justObservable.subscribe(mObserver);


mObserver的onNext()方法会依次接收来自justObservable的数据”just1”,”just2”,另外,如果你不在意数据是否接收完或者是否出现错误,将不需要onCompleted和onError方法,可使用Action1,subscribe()支持将Action1作为参数传入,Rxjava将会调用他的call方法来接收数据;

justObservable.subscribe(new Action1<String>(){
@Override
public void call(String s) {

LogUtil.log(s);
}})


以上就是RxJava的简单用法,RxJava之所以受用户喜爱,是因为随着逻辑的复杂,需求的更改,代码依然可以保持极强的阅读性。

可以使用filter添加限制条件
Observable.create(new Observable.OnSubscribe<List<User>>() {
@Override
public void call(Subscriber<? super List<Us
aa29
er>> subscriber) {
List<User> userList = null;
···
//从数据库获取用户表数据并赋给userList
···
subscriber.onNext(userList);
}
}).flatMap(new Func1<List<User>, Observable<User>>() {
@Override
public Observable<User> call(List<User> users) {
return Observable.from(users);
}
}).filter(new Func1<User, Boolean>() {
@Override
public Boolean call(User user) {
return user.getName().equals("小明");
}
}).subscribe(new Action1<User>() {
@Override
public void call(User user) {
//拿到谜之小明的数据
}
});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RxJava Android