您的位置:首页 > 编程语言 > Java开发

RxJava使用(一)

2016-09-18 22:39 281 查看
首先扯点别的:中秋节就这么匆匆过去了,第一次在上海喝醉,不过还是挺高兴的,可惜没有见到她呀。

总结一下,最近看rxjava的内容,参考网址会在文章末尾列出。

Android Studio,builder文件中加入下面两行

compile 'io.reactivex:rxjava:1.1.6'
compile 'io.reactivex:rxandroid:1.2.1'
//配合retrofit使用,加上下面三行
// compile 'com.squareup.retrofit2:retrofit:2.1.0'
// compile "com.squareup.retrofit2:converter-gson:2.1.0"
// compile "com.squareup.retrofit2:adapter-rxjava:2.1.0"


RxJava 的基本实现主要有三点:


1: 创建 Observer 即观察者,它决定事件触发的时候将有怎样的行为。除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。

2:创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。

3:Subscribe (订阅)

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。

1 创建观察者的两种方式

//第一种方式,创建一个Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};
//第2种方式,创建一个Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.e("tag", s);
}
};


2 创建被观察者的几种方式

//第一种方式
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});

//第2种方式
//just(T...): 将传入的参数依次发送出来。
Observable observable1 = Observable.just("hello", "hi", "world");
/**
将会依次调用:
onNext("Hello");
onNext("Hi");
onNext("world");
onCompleted();
*/

//第3种方式
//from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi"};
/**
* 将会依次调用:
onNext("Hello");
onNext("Hi");
onCompleted();
*/

ArrayList<String> list = new ArrayList<>();
list.add("hello");
list.add("Hi");
list.add("Aloha");
Observable observable3 = Observable.from(list);
/**
* 将会依次调用:
onNext("Hello");
onNext("Hi");
onNext("Aloha");
onCompleted();
*/


3 Subscribe (订阅).

observable.subscribe(subscriber);


subscribe() 还支持不完整定义的回调.

Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.e(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(tag, throwable.getMessage());
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.e(tag, "onCompletedAction");
}
};
Action4<String, String, Integer, List<String>> action4 = new Action4<String, String, Integer, List<String>>() {
@Override
public void call(String s, String s2, Integer integer, List<String> list) {

}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);


线程控制 —— Scheduler

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,

即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在那个线程。 RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程, 因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

subscribeOn(): 指定事件产生的线程。

observeOn(): 指定事件消费的线程。

举个例子,比如我现在使用AsyncTask从数据库查询数据,返回的是一个List列表,就可以这样写

class AsynLoacCity extends AsyncTask<Integer, Void, Boolean> {
@Override
protected Boolean doInBackground(Integer... params) {
//在doInBackground方法中查询数据库,后台取数据
cityInfoList=DataSupport.where("id > ?", "0").find(CityInfo.class);
if (cityInfoList.size() > 0) {
return true;
} else {
LogUtil.e("tag", "doInBackground 查询失败");
return false;
}
}

@Override
protected void onPostExecute(Boolean success) {
//如果查询成功,就是用cityInfoList给recyclerView设置适配器,主线程更新ui
setAdapter(cityInfoList);
}
}


上面的代码完全可以使用RxJava替代,写法如下

Observable.create(new Observable.OnSubscribe<List<CityInfo>>() {
@Override
public void call(Subscriber<? super List<CityInfo>> subscriber) {
cityInfoList=DataSupport.where("id > ?", "0").find(CityInfo.class);
if(cityInfoList.size()>0){
subscriber.onNext(cityInfoList);
subscriber.onCompleted();
}
}
}).subscribeOn(Schedulers.io())//指定上面的查询操作和事件发出在io线程,
.observeOn(AndroidSchedulers.mainThread())//指定观察者在主线程消费事件
.subscribe(new Subscriber<List<CityInfo>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(List<CityInfo> cityInfoList) {
//在主线程接受事件,进行处理
setAdapter(cityInfoList);
}
});


关于subscribeOn()和observeOn()线程控制的切换注意一下几点

下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件
1.只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)。
2.这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()。
3.observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作。
4.不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,
不会自动切换到其他线程。


看下面一个例子 example1

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())//指定事件的产生在io线程
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "string" + integer;
}
})//
.observeOn(AndroidSchedulers.mainThread())
//遇到observeOn()从io线程切换到UI线程,之前的map操作也是在io线程,之后的subscribe()操作在UI线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("tag", "UI线程打印" + s);
}
});


在上面的example1中,事件的产生和map操作都是在io线程,而事件的消费(打印字符串)是在UI线程

Observable.doOnSubscribe():此方法是在 subscribe() 调用后而且在事件发送前执行,可以指定线程(比如显示个loading界面)。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);


如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能保证progressBar的显示在主线程。

RxJava的操作符来一波

1:map,一对一的转化

// map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回, 而在经过 map() 方法后,
//事件的参数类型也由String 转为了 Bitmap。
Observable.just("images/logo.png")// 输入类型 String
.subscribeOn(Schedulers.io())
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) {
//从本地加载图片
return getBitMapFromPath(filePath);
}
})
.observeOn(AndroidSchedulers.mainThread())//指定更新ui在主线程
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//在主线程中更新ui
imageView.setImageBitmap(bitmap);
}
});


2 flatMap 一对多的转化

假设有一个数据结构『学生』

public class Student {

private List<Course> courseList;

public List<Course> getCourseList() {
return courseList;
}

public void setCourseList(List<Course> courseList) {
this.courseList = courseList;
}

}


Course是课程类

public class Course {

private String lesson;

public void setLesson(String lesson) {
this.lesson = lesson;
}

public String getLesson() {
return lesson;
}


现在输出多个学生的所有课程的名字,改怎么办呢?

第一:可以使用map操作符:

//private List<Student> studentList;,初始化数据过程省略
Observable.from(studentList)//使用from将studentList里面的student一个个的发射出来
.map(new Func1<Student, List<Course>>() {
@Override
public List<Course> call(Student student) {
//将一个学生对象转化成一个List<Course>对象
return student.getCourseList();
}
}).subscribe(new Action1<List<Course>>() {
@Override
public void call(List<Course> courses) {
//在这里接收到转化后的List<Course>对象,循环输出课程的名字。
for (int i = 0; i < courses.size(); i++) {
Log.e("tag", courses.get(i).getLesson());
}
}
});


2:使用flatMap操作符,更加简洁。

Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
//通过flatMap,把一个Student对象转化成一个Observable返回
//student.getCourseList()是一个List
//返回的Observable使用from发射出List中的每个Course
return Observable.from(student.getCourseList());
}
}).subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
//这时候订阅到的就是一个Course对象,直接输出课程名字就可以了,更加简洁。
Log.e("tag", course.getLesson());
}
});


/**
flatMap() 的原理是这样的:
1. 使用传入的事件对象创建一个 Observable 对象;
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,
4.  而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
5.  这三个步骤,把事件拆成了两级,
6.  通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去
*/


3 timer操作符

使用timer操作符,延迟产生一个数字就结束
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//输出0
Log.e(tag, "timer" + aLong);
}
});


可以使用timer操作符实现从欢迎页延迟进入app的主界面

//延迟两秒启动SecondActivity
Observable.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
startActivity(new Intent(MainActivity.this, SecondActivity.class));
finish();
return null;
}
}).subscribe();


4 interval 操作符

/*interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大*/
Observable.interval(2, 2, TimeUnit.SECONDS, Schedulers.io())
.take(5)//最多输出5个
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(tag, "timer" + aLong);
}
});


range操作符: range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字,

Observable.range(3, 10, Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(tag, "timer" + integer);
}
});


使用repeat 和repeatWhen操作符

Observable.range(3, 3).repeat(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(tag, "timer" + integer);
}
});
//3,4,5 输出两次


Observable.just(1,2,3).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
//重复3次
return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() {
@Override
public Integer call(Void aVoid, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
System.out.println("delay repeat the " + integer + " count");
//1秒钟重复一次
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}

@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});


运行结果
Next:1
Next:2
Next:3
repeat the 1 count
Next:1
Next:2
Next:3
repeat the 2 count
Next:1
Next:2
Next:3
repeat the 3 count
Next:1
Next:2
Next:3
Sequence complete.


RxJava 的适用场景

1:与 Retrofit 的结合

2:各种异步操作

结尾,换一篇再继续写RxJava的操作符

参考网址

【1】http://www.androidweekly.cn/android-dev-special-weekly-rxjava-rxandroid-issue-0/

【2】http://gank.io/post/560e15be2dca930e00da1083

【3】http://blog.csdn.net/job_hesc/article/details/46242117

【4】https://segmentfault.com/a/1190000004856071
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: