您的位置:首页 > 移动开发 > Android开发

Android RxJava 2.x入门例子详解(五)

2017-09-08 11:52 573 查看

前言

上一节我们讲解了线程调度器,这个时候我们可能需要考虑一个问题。上下游工作在不同的线程,这时候收发数据无需等待,当上游发事件的速度太快, 下游处理事件的速度太慢会发生什么事情?这也就是所谓的背压(Backpressure)问题。

我们先看一个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {

@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {    //无限循环发事件
Log.d(TAG, "Observable发出:"+i);
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(1000);//一秒处理一次数据
Log.d(TAG, "" + integer);
}
});


没错,我们的内存会爆掉,最后OOM,那要怎么解决这个问题呢?

不用担心,RxJava2.x已经为你想到这个问题。

Flowable和Subscriber的使用

Flowable<Integer> flowable=Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.d(TAG, "flowable:B1");
e.onNext(1);//向下游(观察者)发射内容1
Log.d(TAG, "flowable:B2");
e.onNext(2);
Log.d(TAG, "flowable:B3");
e.onNext(3);
Log.d(TAG, "flowable:B4");
e.onNext(4);
Log.d(TAG, "flowable:B5");
e.onNext(5);
e.onComplete();
}
}, BackpressureStrategy.ERROR);//增加了一个参数

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);  //注意这句代码,这里告诉上游,下游的处理能力
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};

flowable.subscribe(subscriber);


我们稍微修改一下代码:

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.d(TAG, "flowable:1");
e.onNext(1);//向下游(观察者)发射内容1
Log.d(TAG, "flowable:2");
e.onNext(2);
Log.d(TAG, "flowable:3");
e.onNext(3);
Log.d(TAG, "flowable:4");
e.onNext(4);
Log.d(TAG, "flowable:5");
e.onNext(5);
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);  //这里告诉上游,下游的处理能力为3
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);//上游多发事件报警告
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


运行结果报异常警告,为什么呢?因为s.request(3)告诉上游处理能力为3,上游多发事件,则直接报警告。

而多发事件的处理是由BackpressureStrategy.ERROR决定

该参数还有以下选项:

BackpressureStrategy.DROP(多发则丢弃不处理)

BackpressureStrategy.BUFFER(多发则先放到缓存)

BackpressureStrategy.LATEST(多发则只保留最后的事件)

再看一个例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0;i<100 ; i++) {
Log.d(TAG, "flowable:"+i);
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription=s;
Log.d(TAG, "onSubscribe");
s.request(10);  //这里告诉上游,下游的处理能力为10
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


我们使用BackpressureStrategy.BUFFER把多发的事件保存到缓存里,运行代码没有警告异常。

这时候我们可以通过外部控制mSubscription.request,例如点击按钮调用mSubscription.request(10);

系统会继续运行输出onNext。

上面都是同一线程下的结果,如果是不同线程会发生什么?

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
Log.d(TAG, "flowable:"+i);
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10);  //这里告诉上游,下游的处理能力为10,实际上并没什么卵用
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


运行代码,下游虽然通过s.request(10)告诉上游处理能力为10,但是上游并不理会,而继续不断发送事件。这是为什么呢?是不是上游还不知道下游的处理能力?那有没有办法呢?

答案就是FlowableEmitter的requested()

再看看下面的例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i<300 ; i++) {
//Log.d(TAG, "flowable:"+i);
Log.d(TAG,"requested:"+e.requested());
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);  //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
//mSubscription.request(1);
}

@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


运行结果发现e.requested()获取到的结果是128,每调用一次onNext减1。

为什么是128?是因为异步情况下,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化。

如果是同一线程则得到结果一样哦,这里不展示(同学们自己尝试吧)

既然有e.requested(),那么好办了

我们稍微修改一下程序:

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
boolean flag;

for (int i = 0; ; i++) {    //无限循环发事件
Log.d(TAG,"requested:"+e.requested());
flag = false;
while (e.requested()==0){
if (!flag){
Log.d(TAG,"下游不能处理,暂不能发送");
flag=true;
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
private Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
mSubscription=s;
Log.d(TAG, "onSubscribe");
s.request(3);  //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
try {
Thread.sleep(1000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});


运行程序发现,首次上游调用了128次事件,随后下游处理96次后,上游再次调用96次事件,这样一直循环下去。可见程序很有规律,没有出现OOM。

为什么会下游处理96次后上游会继续调用事件?

这是因为Flowable内部实现了s.request请求,也就是说下游处理了一部分事件后,Flowable内部自动调用s.request(95),自动增加处理能力。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息