初识RXJava+Retrofit
2017-03-03 22:43
295 查看
一:观察者模式基本实现
1.创建观察者Subscriber
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
2.创建被观察者Observable ,observable是被观察者,subscribe是订阅者,onsbuscribe是订阅的接口,里面有个call方法需要实现
Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("fyc"); subscriber.onCompleted(); } });
3.完成订阅
observable.subscribe(subscriber); //该方法的源码是: public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); //由此可见观察者不是在创建的时候就发送请求的,而是在订阅的时候才call的 return subscriber; //将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe(). //另外:public abstract class Subscriber<T> implements Observer<T>, Subscription {}所以这个类可以强转为接口 }
4.快捷创建事件队列
1> just(T...): 将传入的参数依次发送出来。 Observable observable = Observable.just("Hello", "Hi", "Aloha") .subscribe(subscriber); // 将会依次调用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted(); 2> String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words) .subscribe(subscriber); // 将会依次调用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();
5.除了 subscribe(Observer) 和 subscribe(Subscriber) , subscribe() 还支持不完整定义的回调
Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } };
简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;
由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,
因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer ,这样更加严谨。
6.完美例子
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });
二:时间调度,实现观察者模式的异步调度(scheduler:调度器)
1.调度器分为如下四种
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
2.完美例子
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程,既会在主线程打印1234 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
三:完成对事件序列的变换(如将string转换为bitmap),map与flatMap
1> 获取所有学生的名字Student[] students = ...; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ... }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber);
注意: FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。 总结:map可以很容易的实现将一种对象转换为另一种对象,是一对一的关系,而flatmap是一对多的关系。
2> 获取所有学生的所有课程
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ... }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); //总结:将所有的事件封在observable中,铺平后统一分发给订阅者
四:防抖动
在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) // RxBinding 代码,设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
五:rxjava与retrofit适用场合
1> 请求完数据后,需要做进一步的耗时操作,比如从数据库拿东西时:a: Callback方式: getUser(userId, new Callback<User>() { @Override public void success(User user) { new Thread() { @Override public void run() { processUser(user); // 尝试修正 User 数据 runOnUiThread(new Runnable() { // 切回 UI 线程 @Override public void run() { userView.setUser(user); } }); }).start(); } @Override public void failure(RetrofitError error) { // Error handling ... } };
b: RxJava 的形式 getUser(userId) .doOnNext(new Action1<User>() { @Override public void call(User user) { processUser(user); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } }); //可以看出rxjava对后台进一步耗时操作有专门的处理,doOnNext直接处理
2> 两次网络请求需要嵌套时:
a: Callback方式: @GET("/token") public void getToken(Callback<String> callback); @GET("/user") public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback); ... getToken(new Callback<String>() { @Override public void success(String token) { getUser(token, userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... } }; } @Override public void failure(RetrofitError error) { // Error handling ... } });
b: RxJava 的形式 getToken() .flatMap(new Func1<String, Observable<User>>() { @Override public Observable<User> onNext(String token) { return getUser(token, userId); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } }); //可以看出rxjava对于两次网络请求不需要两次的callback,一个flatMap搞定
该博客来源于http://gank.io/post/560e15be2dca930e00da1083
相关文章推荐
- Retrofit笔记->结合Rxjava初识
- 优雅地使用Retrofit+RxJava(二)
- Android MVP+Retrofit+RxJava实践小结
- Retrofit + Rxjava + Okhttp
- Retrofit实践(MVP+RxJava)
- 普通MVP+XRexyclerview+RxJava+Retrofit+Fresco
- 初识RxJava
- RetrofitAndRxJavaAndMVP二次封装
- Retrofit2+RxJava
- Retrofit+RxJava实现app崩溃处理(一)上传文件
- 1、RxJava2 & Retrofit2封装实践 简介
- RXjava+Retrofit请求数据
- Retrofit结合RxJava请求网络数据
- RxJava加Retrofit
- Android_Retrofit+RxJava的MVP
- 封装Okhttp+retrofit+rxjava使用MVP模式实现登录注册
- MVP+Retrofit+RxJava+登录、注册
- RxJava+Retrofit+OkHttp深入浅出-终极封装三(文件上传)
- Mvp 购物车 RetrofitHelper +RxJava
- RxJava开发精要8 - 与REST无缝结合-RxJava和Retrofit