RxAndroid2.0使用概述
2017-03-12 11:50
211 查看
RxJava —— WHAT | WHEN | HOW (基于rxjava2)
1.WHAT
RxJava本质上是一个实现异步操作的库,是扩展的观察者模式。
实现异步还有可以使用AsyncTask、Handler等其他线程类,但是RxJava可以做到简洁。从SD中读取所有图片并添加到图片管理,因为有IO操作,所以使用线程处理。
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageManager.add(bitmap); } }); } } } } }.start();
而使用RxJava中可以这样写:
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageManager.add(bitmap); } });
虽然代码上是增加了,但是逻辑上采用了链式调用,更加清晰。使用了lamda还可以是这样:
Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
RxJava中有四个基本概念:观察者、被观察者/主题、订阅、事件。
Observable/Observer(不支持backpress)
Flowable/FlowableSubscriber(支持backpress)
Single/SingleObserver(只发送一个onSuccess或者onError的通知)
Completable/CompletableObserver(只发送一个onComplete或者onError的通知)
Maybe/MaybeObserver(Single与Completable的结合,只发送一个onSuccess或者onComplete或者onError的通知)
Backpress:当被观察者不停发射数据流,而观察者的响应不及时就会产生MissingBackpressureException,所以需要一定的策略来应对无法这种情况。
2.WHEN
常见的使用场景可以是请求网络、io操作等,具体可以参考的业务逻辑,如:- 检查缓存是否失效取然后选择数据源;
- 需要多个接口返回数据后更新界面;
- 接口的请求参数另一个接口请求返回的数据;
- 界面按钮需要防止连续点击的情况;
- 响应式的界面;
- 复杂的数据变换;
3. HOW
3.1 操作符
creat、just、fromXXX
Observable.create从零开始创建一个Observable;Observable.just的参数即为将要发射的数据,可传入多个基类相同对象;
Observable.fromXXX方法包括:fromArray、fromCallable、fromFuture、fromIterable、fromPublisher;from方法参数类型:数组、Callable接口对象、Future接口对象、Iterable接口对象、Publisher接口对象
Future,Callable都是Java并发框架的接口,Callable 、Future、ThreadPoolExecutor需要一起研究;Publisher接口是rxjava的基础接口。
empty、never、error
empty:创建一个不发射任何数据但是正常终止的Observable,此时会回调onCompletednever:创建一个不发射数据也不终止的Observable
error:创建一个不发射数据以一个错误终止的Observable
range
该操作符创建特定整数序列的Observable,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据timer
timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。它在延迟一段给定的时间后发射一个简单的数字0 。interval
该操作符按固定的时间间隔发射一个无限递增的整数序列,它接受一个表示时间间隔的参数和一个表示时间单位的参数。repeat
该操作符是重复的发射某个数据序列,并且可以自己设置重复的次数。defer
直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,该操作符能保证订阅执行时数据源是最新的数据。map
该操作符是对原始Observable发射的每一项数据运用一个函数,然后返回一个发射这些结果的Observable。flatMap
该操作符与map操作符的区别是它将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。cast
该操作符就是做一些强制类型转换操作的。例如,当我们在页面跳转时数据对象往往是序列化的,当我们在新的页面收到数据后就要强制转换为我们想要的类型。底层调用map操作符。concatMap
该操作符是类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。switchMap
当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。filter
该操作符接收一个Predicate参数,可以在其中通过运用自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射。first
只对Observable发射的第一项数据,或者满足某个条件的第一项数据进行处理,则可以使用First操作符。last
该操作符与first意义相反,若我们只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣时使用该操作符。publish、refCount
Observable 有 Cold 和 Hot 之分:Hot Observable 无论有没有 Observer 订阅,事件都会发出;Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。
使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable,它将Observable 转换成 ConnectableObservable,而ConnectableObservable在被订阅后需要调用connect()才会开始发射数据流;ConnectableObservable的refCount操作符可以将Hot Observable转换成 Cold Observable。
3.2 源码分析
订阅
通过查看源码会看到Observable.just调用了Observable.fromXXX方法,而Observable.fromXXX和Observable.create调用了RxJavaPlugins.onAssembly。RxJavaPlugins.onAssembly参数:
ObservableFromCallable<T> extends Observable<T> implements Callable<T> ObservableFromArray<T> extends Observable<T> ObservableFromFuture<T> extends Observable<T> ObservableFromIterable<T> extends Observable<T> ObservableFromPublisher<T> extends Observable<T>
创建Observable对象的from方法返回的就是ObservableFrom这些对象,ObservableFrom对象继承自Observable,实现了subscribeActual方法。
Observable.just("").subscribe(new Observer());
Observable的非静态订阅方法subscribe实际上是调用了subscribeActual(Observer s)。
发射数据
Cold Observable的subscribeActual(Observer s)方法实际上也是数据流的发送方法。在源码中会发现,在Observer订阅当前Cold Observable之后便立即开始了数据发射方法,以fromArray举例:@Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); }
先创建一个FromArrayDisposable对象对需要发射的数据流进行封装,在执行订阅s.onSubscribe(d);后立即开始了d.run();数据流的发射操作。
ConnectableObservable是Hot Observable,不会主动发射数据流。
Observable的publish操作符在创建ConnectableObservable是实际上是创建了一个ObservablePublish对象,ObservablePublish继承了抽象类ConnectableObservable, ConnectableObservable也是Observable的子类,ObservablePublish在实现subscribeActual(Observer s)方法的时候只进行了Observer对Observable的订阅,而没有立刻发射数据流。同时实现了ConnectableObservable的connect方法,在connect方法中才开始进行数据流的发送。
这也就是之前提到的,需要在使用publish操作符将Cold Observable转为Hot Observable需要调用connect才能发送数据流。
Hot Observable转为Cold Observable,关键点在ConnectableObservable的refCount操作符会调用RxJavaPlugins.onAssembly()来创建一个继承了Observable的ObservableRefCount对象,ObservableRefCount的构造器接受ConnectableObservable对象,ObservableRefCount在实现subscribeActual(Observer s)方法的时候调用了ConnectableObservable的connect方法,以此来达到Hot Observable转为Cold Observable的目的。
总结:RxJava各种操作符的实现,实际上是装饰模式的各种妙用
线程切换
//常用的线程策略 Schedulers.immediate()//在当前线程运行,默认为此策略; Schedulers.newThread()//每次都创建新的线程执行操作; Schedulers.io()//类似newThread()但是此策略有无限量的线程池,主要用于读写文件、数据库、网络请求等; Schedulers.computation()//用于需要计算的策略,使用线程池,池大小为CPU核心数; Schedulers.trampoline()//将任务加入一个队列,等待执行 AndroidSchedulers.mainThread()//在Android主线程中执行, RxAndroid独有
使用Rxjava可以非常方便指定订阅者对执行线程。例:
Flowable<String> flowableJust = Flowable.just("test Just Flowable"); Disposable disposable = flowableJust .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map1 thread = " + Thread.currentThread().getName()); return s + "mp1"; } }) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map2 thread = " + Thread.currentThread().getName()); return s + "mp2"; } }) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.io()) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map3 thread = " + Thread.currentThread().getName()); return s + "mp3"; } }) .map(new Function<String, String>() { public String apply(String s) throws Exception { System.out.println("map4 thread = " + Thread.currentThread().getName()); return s + "mp4"; } }) .subscribe(new Consumer<String>() { public void accept(String s) throws Exception { System.out.println("onNext Just s = " + Thread.currentThread().getName()); } }, new Consumer<Throwable>() { public void accept(Throwable throwable) throws Exception { System.out.println("onError Just throwable = " + throwable.getMessage()); } }, new Action() { public void run() throws Exception { System.out.println("onComplete Just"); } }, new Consumer<Subscription>() { public void accept(Subscription subscription) throws Exception { subscription.request(Long.MAX_VALUE); System.out.println("onSubscribe Just subscription = " + Thread.currentThread().getName()); } });
运行结果:
onSubscribe Just subscription = main map1 thread = RxComputationThreadPool-1 map2 thread = RxComputationThreadPool-1 map3 thread = RxCachedThreadScheduler-1 map4 thread = RxCachedThreadScheduler-1 onNext Just s = RxCachedThreadScheduler-1
subscribeOn操作符改变调用它之前代码的线程;observeOn操作符改变调用它之后代码的线程。
除了订阅操作本身在主线程中运行,其他操作都在subscribeOn与observeOn两个操作符指定的线程中运行。
Schedulers是创建Scheduler的工厂, 提供了若干静态方法用来创建各种Scheduler;
Scheduler提供创建Workder的接口;
Worker提供了几种执行任务的接口,用来执行任务, 它底下利用各种类型的线程或者线程池完成任务的执行,它是真正执行任务的地方;
同一类型的Scheduler只有一个,但是对应的worker是不同的.比如Schedulers.computation, 对应的Scheduler只有一个,但是每次调用createWorker,获得的worker是scheduler里面worker数组中的一个(数组数目和处理器的数目相同)
相关文章推荐
- rxjava+rxandroid+retrofit2.0使用方法demo讲解
- Android响应式编程框架---RxJava&RxAndroid2.0使用笔记
- retrofit 2.0 +RxAndroid 使用中遇到异常 【abstract method not implemented】
- Rxandroid2.0 使用一
- 使用Android OpenGL ES 2.0绘图之五:添加运动
- 使用Android OpenGL ES 2.0绘图之五:添加运动
- android --相机使用详解概述
- android --相机使用详解概述
- Android RoboGuice 使用指南(3):Bindings 概述
- 【Android 应用开发】OpenGL ES 2.0 -- 制作 3D 彩色旋转三角形 - 顶点着色器 片元着色器 使用详解
- 使用Android OpenGL ES 2.0绘图之四:应用投影和相机视口
- android中的后退键——onBackPressed()的使用 SDK2.0之后新增
- [原]零基础学习SDL开发之在Android使用SDL2.0显示BMP图
- android --相机使用详解概述
- 【多端应用开发系列1.1.1 —— Android:使用新浪API V2】服务器Json数据处理——Json数据概述
- 使用Android OpenGL ES 2.0绘图之六:响应触摸事件
- android --相机使用详解概述
- Android 2.0上使用蓝牙通信代码片断(服务端、客户端、数据传输)
- 使用Android OpenGL ES 2.0绘图之一:搭建一个OpenGL ES环境
- [置顶] 使用Android OpenGL ES 2.0绘图之六:响应触摸事件