您的位置:首页 > 移动开发 > Android开发

[Android开发] RxJava2之路三 - 调度器Scheduler与线程控制

2017-02-09 19:08 387 查看

一、简介

RxJava是一个异步的框架,使用Scheduler调度器可以对线程进行控制。

二、调度器种类

常用的是 Schedulers.io()进行耗时操作、和AndroidSchedulers.mainThread()更新ui

1. Schedulers.immediate()

直接在当前线程运行,相当于不指定线程,默认的Scheduler

2. Schedulers.newThread():

总是启动新线程,在新的线程中执行操作

3. Schedulers.io()

I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler,行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io()比newThread()更有效率。不要把计算工作放在io(),可以避免穿件不必要的线程。

4. Schedulers.computation()

计算所使用的Scheduler。这个计算是指CPU密集型计算,即不会被I/O等操作限制性的操作,例如图形的计算。这个Sheduler使用的固定的线程池,大小为cpu核数。不要把I/O放在computation中,否则I/O操作等待时间会浪费cpu。用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器

的数量

5. Schedulers.from(executor)

使用指定的Executor作为调度器。

6. Schedulers.trampoline()

当其它排队的任务完成后,在当前线程排队开始执行

7. AndroidSchedulers.mainThread()

在RxAndroid中,他指定操作将在Android主线程中执行。

三、指定线程

1. observerOn(Schedulers)

指定观察者Observer在哪个线程执行

2. subscribeOn(Scheduler)

指定被观察者Observable在哪个线程执行

Observable.doSubscribe() 它和Subscribe.onStart() 同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。

四、线程多次随意切换

observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber);  // Android 主线程,由 observeOn() 指定


五、线程控制的Demo

看下面的代码,在被观察者进行execute网络请求,然后在观察者那里打印值。 猜结果会怎么样?

Observable.create(new ObservableOnSubscribe<Bean>() { //定义被观察者
@Override
public void subscribe(final ObservableEmitter<Bean> e) throws Exception {
//进行网络请求
//同步请求execute,在当前线程中进行请求
Response<Bean> response =  api.getListDisease().execute();
e.onNext(response.body());
}
}).subscribe(new Observer<Bean>() {        //观察者
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");

}

@Override
public void onNext(Bean value) {

Log.e(TAG, "onNext: "+value.getType());
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+e.toString());
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});


报错了,报了观察者的onError,输出原因是主线程错误,网络请求不能在主线程。

原因: 因为没有加调度器的情况下,被观察者默认调度器是 Schedulers.immediate(),这个条调度器是默认在当前线程执行的,就是在ui线程了。所以在ui线程进行网络请求就报错了。

接下来添加调度器:

Observable.create(new ObservableOnSubscribe<Bean>() { //定义被观察者
@Override
public void subscribe(final ObservableEmitter<Bean> e) throws Exception {
//进行网络请求
//同步请求execute,在当前线程中进行请求
Response<Bean> response =  api.getListDisease().execute();
e.onNext(response.body());

}
})
.subscribeOn(Schedulers.io())  //被观察者 在线程池中调用了
.observeOn(AndroidSchedulers.mainThread()) //观察者在主线程中实现
.subscribe(new Observer<Bean>() {        //观察者
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");

}

@Override
public void onNext(Bean value) {

Log.e(TAG, "onNext: "+value.getType());
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+e.toString());
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});


添加调度器subscribeOn(Schedulers.io())表示 被观察者在进行io操作,然后就会让它在线程池中执行

添加调度器observeOn(AndroidSchedulers.mainThread())表示观察者在主线程中操作,可以进行更新ui的操作

六、其他调度器

还有啥递归调度器、延时和周期调度器、测试调度器等可以查看文档

https://www.gitbook.com/book/mcxiaoke/rxdocs/details

http://wiki.jikexueyuan.com/project/rx-docs/Scheduler.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息