您的位置:首页 > 编程语言 > Java开发

关于RxJava的简单用法(四)

2017-01-11 10:16 369 查看

关于Subject

关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:

它可以充当Observable;
它可以充当Observer;
它是Observable和Observer之间的桥梁;
接下来对这三个要点解释一下,但在解释之前,要先介绍一下Subject的种类, Subject是一个抽象类,不能通过new来实例化Subject,所以Subject有四个实现类,分别为AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject,每个实现类都有特定的“技能”,下面结合代码来介绍一下它们各自的“技能”。注意,所有的实现类都由
create()
方法实例化,无需new,所有的实现类调用
onCompleted()
onError()
,它的Observer将不再接收数据;

Subject的分类解析

AsyncSubject
Observer会接收AsyncSubject的
`onComplete()
之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:

AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject1");
asyncSubject.onNext("asyncSubject2");
asyncSubject.onNext("asyncSubject3");
asyncSubject.onCompleted();
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

LogUtil.log("asyncSubject onCompleted");  //输出 asyncSubject onCompleted
}

@Override
public void onError(Throwable e) {

LogUtil.log("asyncSubject onError");  //不输出(异常才会输出)
}

@Override
public void onNext(String s) {

LogUtil.log("asyncSubject:"+s);  //输出asyncSubject:asyncSubject3
}
});

以上代码,Observer只会接收asyncSubject的onCompleted()被调用前的最后一个数据,即“asyncSubject3”,如果不调用onCompleted(),Subscriber将不接收任何数据。

BehaviorSubject
Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。(注意跟AsyncSubject的区别,AsyncSubject要手动调用onCompleted(),且它的Observer会接收到onCompleted()前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject不需手动调用onCompleted(),它的Observer接收的是BehaviorSubject被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。)示例代码如下:

BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
behaviorSubject.onNext("behaviorSubject1");
behaviorSubject.onNext("behaviorSubject2");
behaviorSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

LogUtil.log("behaviorSubject:complete");
}

@Override
public void onError(Throwable e) {

LogUtil.log("behaviorSubject:error");
}

@Override
public void onNext(String s) {

LogUtil.log("behaviorSubject:"+s);
}
});

behaviorSubject.onNext("behaviorSubject3");
behaviorSubject.onNext("behaviorSubject4");

以上代码,Observer会接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在
behaviorSubject.subscribe()
之前不发送behaviorSubject1、behaviorSubject2,则Observer会先接收到default,再接收behaviorSubject3、behaviorSubject4。

PublishSubject
PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject1");
publishSubject.onNext("publishSubject2");
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
LogUtil.log("publishSubject observer1:"+s);
}
});
publishSubject.onNext("publishSubject3");
publishSubject.onNext("publishSubject4");

以上代码,Observer只会接收到"behaviorSubject3"、"behaviorSubject4"。

ReplaySubject
ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。示例代码如下:

ReplaySubject<String>replaySubject = ReplaySubject.create(); //创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
//replaySubject = ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
//replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据
//replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被订阅前的前1秒内发送的数据才能被接收
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
LogUtil.log("replaySubject:" + s);
}
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

以上代码,由于情况比较多,注释也已解释的相当清楚,就不对输出结果一一表述了,有疑问的自行copy代码去测试一下。至此,四种Subject类型已经介绍完毕,但是需要注意,如果你把 Subject 当作一个 Subscriber 使用,不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将
Subject 转换为一个 SerializedSubject ,类似于这样:

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);


要点解答

接下来,我们继续前面提出的问题,为什么说Subject既可充当Observable,又可充当Observer,是它们两个之间的桥梁呢?经过前面的例子,也许有些人已经大概理解了,不理解的且听我细细道来。首先,从理论上讲,Subject继承了Observable,又实现了Observer接口,所以说它既是Observable又是Observer,完全合理。从实际应用上讲,Subject也能实现Observable和Observer相同的功能,口说无凭,我们还是通过代码来证实比较有说服力。

创建Observable并发射数据:

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("I'm Observable");
subscriber.onCompleted();
}
});

用Subject实现为:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();


创建Observer订阅Observable并接收数据:

mObservable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

//接收数据
}
});

用Subject实现为:

publishSub
9a7d
ject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});


也许有人会问,不是说Subject也可以作为Observer,不能把Subject当作Observer传入subscribe()中吗?回答是:当然可以!就象这样:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("as Observer");
subscriber.onCompleted();
}
}).subscribe(publishSubject);

有没有发现问题?publishSubject没有重写
onNext()
方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入
subscribe()
,但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {

subscriber.onNext("as Bridge");
subscriber.onCompleted();
}
}).subscribe(publishSubject);

publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

LogUtil.log("subject:"+s); //接收到 as Bridge
}
});

没错,这很桥梁!

转载http://www.jianshu.com/p/240f1c8ebf9d
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: