您的位置:首页 > 编程语言 > Java开发

初识RXJava+Retrofit

2017-03-03 22:43 295 查看

一:观察者模式基本实现

1.创建观察者Subscriber

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};


2.创建被观察者Observable ,observable是被观察者,subscribe是订阅者,onsbuscribe是订阅的接口,里面有个call方法需要实现

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("fyc");
subscriber.onCompleted();
}
});


3.完成订阅

observable.subscribe(subscriber);

//该方法的源码是:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);   //由此可见观察者不是在创建的时候就发送请求的,而是在订阅的时候才call的
return subscriber;   //将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().
//另外:public abstract class Subscriber<T> implements Observer<T>, Subscription {}所以这个类可以强转为接口
}


4.快捷创建事件队列

1> just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha")
.subscribe(subscriber);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
2> String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words)
.subscribe(subscriber);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();


5.除了 subscribe(Observer) 和 subscribe(Subscriber) , subscribe() 还支持不完整定义的回调

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};


简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;

由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。

这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;

与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,

因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer ,这样更加严谨。

6.完美例子

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!",  Toast.LENGTH_SHORT).show();
}
});


二:时间调度,实现观察者模式的异步调度(scheduler:调度器)

1.调度器分为如下四种

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

2.完美例子

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程,既会在主线程打印1234
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});


三:完成对事件序列的变换(如将string转换为bitmap),map与flatMap

1> 获取所有学生的名字

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);


注意: FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
总结:map可以很容易的实现将一种对象转换为另一种对象,是一对一的关系,而flatmap是一对多的关系。


2> 获取所有学生的所有课程

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);

//总结:将所有的事件封在observable中,铺平后统一分发给订阅者


四:防抖动

在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器:

RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) // RxBinding 代码,设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。

五:rxjava与retrofit适用场合

1> 请求完数据后,需要做进一步的耗时操作,比如从数据库拿东西时:

a: Callback方式:

getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
new Thread() {
@Override
public void run() {
processUser(user); // 尝试修正 User 数据
runOnUiThread(new Runnable() { // 切回 UI 线程
@Override
public void run() {
userView.setUser(user);
}
});
}).start();
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};


b: RxJava 的形式

getUser(userId)
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
//可以看出rxjava对后台进一步耗时操作有专门的处理,doOnNext直接处理


2> 两次网络请求需要嵌套时:

a: Callback方式:
@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
@Override
public void success(String token) {
getUser(token, userId, new Callback<User>() {
@Override
public void success(User user) {
userView.setUser(user);
}

@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
}

@Override
public void failure(RetrofitError error) {
// Error handling
...
}
});


b: RxJava 的形式
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> onNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable error) {
// Error handling
...
}
});
//可以看出rxjava对于两次网络请求不需要两次的callback,一个flatMap搞定


该博客来源于http://gank.io/post/560e15be2dca930e00da1083
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: