Android RxJava 2.x入门例子详解(五)
2017-09-08 11:52
573 查看
前言
上一节我们讲解了线程调度器,这个时候我们可能需要考虑一个问题。上下游工作在不同的线程,这时候收发数据无需等待,当上游发事件的速度太快, 下游处理事件的速度太慢会发生什么事情?这也就是所谓的背压(Backpressure)问题。我们先看一个例子:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 Log.d(TAG, "Observable发出:"+i); e.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(1000);//一秒处理一次数据 Log.d(TAG, "" + integer); } });
没错,我们的内存会爆掉,最后OOM,那要怎么解决这个问题呢?
不用担心,RxJava2.x已经为你想到这个问题。
Flowable和Subscriber的使用
Flowable<Integer> flowable=Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.d(TAG, "flowable:B1"); e.onNext(1);//向下游(观察者)发射内容1 Log.d(TAG, "flowable:B2"); e.onNext(2); Log.d(TAG, "flowable:B3"); e.onNext(3); Log.d(TAG, "flowable:B4"); e.onNext(4); Log.d(TAG, "flowable:B5"); e.onNext(5); e.onComplete(); } }, BackpressureStrategy.ERROR);//增加了一个参数 Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); //注意这句代码,这里告诉上游,下游的处理能力 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; flowable.subscribe(subscriber);
我们稍微修改一下代码:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.d(TAG, "flowable:1"); e.onNext(1);//向下游(观察者)发射内容1 Log.d(TAG, "flowable:2"); e.onNext(2); Log.d(TAG, "flowable:3"); e.onNext(3); Log.d(TAG, "flowable:4"); e.onNext(4); Log.d(TAG, "flowable:5"); e.onNext(5); e.onComplete(); } }, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(3); //这里告诉上游,下游的处理能力为3 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { //Log.d(TAG, "onError"); Log.w(TAG, "onError: ", t);//上游多发事件报警告 } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
运行结果报异常警告,为什么呢?因为s.request(3)告诉上游处理能力为3,上游多发事件,则直接报警告。
而多发事件的处理是由BackpressureStrategy.ERROR决定
该参数还有以下选项:
BackpressureStrategy.DROP(多发则丢弃不处理)
BackpressureStrategy.BUFFER(多发则先放到缓存)
BackpressureStrategy.LATEST(多发则只保留最后的事件)
再看一个例子:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { for (int i = 0;i<100 ; i++) { Log.d(TAG, "flowable:"+i); e.onNext(i); } } }, BackpressureStrategy.BUFFER) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { mSubscription=s; Log.d(TAG, "onSubscribe"); s.request(10); //这里告诉上游,下游的处理能力为10 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { //Log.d(TAG, "onError"); Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
我们使用BackpressureStrategy.BUFFER把多发的事件保存到缓存里,运行代码没有警告异常。
这时候我们可以通过外部控制mSubscription.request,例如点击按钮调用mSubscription.request(10);
系统会继续运行输出onNext。
上面都是同一线程下的结果,如果是不同线程会发生什么?
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { Log.d(TAG, "flowable:"+i); e.onNext(i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(10); //这里告诉上游,下游的处理能力为10,实际上并没什么卵用 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { //Log.d(TAG, "onError"); Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
运行代码,下游虽然通过s.request(10)告诉上游处理能力为10,但是上游并不理会,而继续不断发送事件。这是为什么呢?是不是上游还不知道下游的处理能力?那有没有办法呢?
答案就是FlowableEmitter的requested()
再看看下面的例子:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { for (int i = 0; i<300 ; i++) { //Log.d(TAG, "flowable:"+i); Log.d(TAG,"requested:"+e.requested()); e.onNext(i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(3); //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); //mSubscription.request(1); } @Override public void onError(Throwable t) { //Log.d(TAG, "onError"); Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
运行结果发现e.requested()获取到的结果是128,每调用一次onNext减1。
为什么是128?是因为异步情况下,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化。
如果是同一线程则得到结果一样哦,这里不展示(同学们自己尝试吧)
既然有e.requested(),那么好办了
我们稍微修改一下程序:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { boolean flag; for (int i = 0; ; i++) { //无限循环发事件 Log.d(TAG,"requested:"+e.requested()); flag = false; while (e.requested()==0){ if (!flag){ Log.d(TAG,"下游不能处理,暂不能发送"); flag=true; } } e.onNext(i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { private Subscription mSubscription; @Override public void onSubscribe(Subscription s) { mSubscription=s; Log.d(TAG, "onSubscribe"); s.request(3); //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); try { Thread.sleep(1000); mSubscription.request(1); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { //Log.d(TAG, "onError"); Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
运行程序发现,首次上游调用了128次事件,随后下游处理96次后,上游再次调用96次事件,这样一直循环下去。可见程序很有规律,没有出现OOM。
为什么会下游处理96次后上游会继续调用事件?
这是因为Flowable内部实现了s.request请求,也就是说下游处理了一部分事件后,Flowable内部自动调用s.request(95),自动增加处理能力。
相关文章推荐
- Android RxJava 2.x入门例子详解(一)
- Android RxJava 2.x入门例子详解(三)
- Android RxJava 2.x入门例子详解(四)
- Android RxJava 2.x入门例子详解(二)
- Android开发入门之View类详解及其小例子
- 【Android】RxJava 入门详解
- Android 入门开发指南之四 -- Hello,Android例子(上)
- 给 Android 开发者的 RxJava 详解
- 读 给 Android 开发者的 RxJava 详解 笔记
- Android中关于JNI 的学习(零)简单的例子,简单地入门
- 给 Android 开发者的 RxJava 详解
- PhoneGap: Android平台入门例子(Hello World)
- Android RxJava操作符详解 系列:功能性操作符
- Android Loader详解四:回调及完整例子
- android编程开发入门实战例子–hello word
- 给 Android 开发者的 RxJava 详解
- Android基础入门教程——2.3.1 TextView(文本框)详解
- Android 开发者的 RxJava 详解
- 给 Android 开发者的 RxJava 详解
- Android 开发者的 RxJava 详解--简书