您的位置:首页 > 编程语言 > Java开发

【RxJava 实践系列】(三)线程控制 — Scheduler

2016-11-13 15:35 537 查看
【RxJava 实践系列】(一)基础知识

【RxJava 实践系列】(二)创建观察者与被观察者

【RxJava 实践系列】(三)线程控制 — Scheduler

调度器 Scheduler

Scheduler能非常方便的决定观察者与被观察者执行所在的线程,这是RxJava一大亮点;

使用ObserveOn和SubscribeOn操作符,你可以让Observable在一个特定的调度器上执行,

ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法

SubscribeOn它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

注意:onStart()方法会执行在Subscriber被创建的线程之上



1、线程测试代码:

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.v(TAG, "observable call: "+Thread.currentThread().getName());
subscriber.onNext("");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {

@Override
public void onStart() {
Log.v(TAG, "onStart:------线程:"+Thread.currentThread().getName());
}

@Override
public void onCompleted() {
Log.v(TAG, "onCompleted:------线程:"+Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.v(TAG, "onError:------线程: "+Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
Log.v(TAG, "onNext: "+s+"------线程:"+Thread.currentThread().getName());
}
});


运行结果:

11-13 02:16:37.780 5505-5505/com.harlan.note V/Rx: onStart:------线程:main
11-13 02:16:37.792 5505-5824/com.harlan.note V/Rx: observable call: RxIoScheduler-2
11-13 02:16:37.792 5505-5823/com.harlan.note V/Rx: onNext: ------线程:RxComputationScheduler-1
11-13 02:16:37.792 5505-5823/com.harlan.note V/Rx: onCompleted:------线程:RxComputationScheduler-1


2、多次切换线程

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.v(TAG, "observable call---- 线程:"+Thread.currentThread().getName());
subscriber.onNext("");
subscriber.onCompleted();
}
}).subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
Log.v(TAG, "map call---- 线程:"+Thread.currentThread().getName());
return null;
}
})
.observeOn(Schedulers.computation())
.subscribe(new Subscriber<Bitmap>() {

@Override
public void onCompleted() {
Log.v(TAG, "onCompleted:------线程:"+Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.v(TAG, "onError:------线程: "+Thread.currentThread().getName());
}

@Override
public void onNext(Bitmap s) {
Log.v(TAG, "onNext: ------线程:"+Thread.currentThread().getName());
}
});

//执行结果:
11-13 02:31:12.988 18844-18844/com.harlan.note V/Rx: observable call---- 线程:main
11-13 02:31:12.988 18844-18911/com.harlan.note V/Rx: map call---- 线程:RxIoScheduler-2
11-13 02:31:12.988 18844-18910/com.harlan.note V/Rx: onNext: ------线程:RxComputationScheduler-1
11-13 02:31:12.988 18844-18910/com.harlan.note V/Rx: onCompleted:-----线程:RxComputationScheduler-1


总结:

subscribeOn(): 被观察者所处线程。(只能调用一次)

observeOn(): 观察者所处线程。(可调用多次)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: