您的位置:首页 > 编程语言 > Java开发

RxJava学习(二)——RxJava使用场景总结

2016-09-11 15:41 477 查看


11个RxJava的实例集合

类名一一对应内容,分别是:

MainActivity——RxJava基础用法

simplestAty——RxJava简单实例

timer——延时操作

interval——周期性操作

twoexample——两个RxJava初始化demo

schedulePeriodically——使用RxJava做轮询请求

merge——合并两个数据源

map_flatmap——RxJava实现变换

foreach——实现数组,List遍历

concat——实现比较一堆item,一旦满足后面的item不执行的功能

lifecycle——回收Observable以防止内存泄漏

代码已上传GitHub,地址请点击这里,求start求watch,当然也欢迎putrequest。

本文对这些实例做代码分析以及讲解:

MainActivity: 

最基础的RxJava执行过程,观察者,被观察者,以及订阅的体现。

/**
* 基本的观察者,被观察者创建方式以及实现订阅。
*/
public class MainActivity extends AppCompatActivity {
String tag = "MainActivity";

//定义观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(tag,"Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag,"Error!");
}

@Override
public void onNext(String s) {
Log.d(tag,"Item: " + s);
}
};

//实现了Observer的抽象类,对其进行了一些扩展,基本使用方式完全一样。
Subscriber<String> subscriber = new Subscriber<String>() {

@Override
public void onCompleted() {
Log.d(tag,"Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag,"Error!");
}

@Override
public void onNext(String s) {
Log.d(tag,"Item: " + s);
}
};

//被观察者 使用onCreate创建
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("whale");
subscriber.onNext("nangua");
subscriber.onCompleted();
}
});

//被观察者 使用just创建
Observable observable1 = Observable.just("Hello","Hi","Aloha");

//被观察者 使用数组传入创建
String[] words = {"Hello","Hi","Aloha"};
Observable observable2 = Observable.from(words);

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//订阅
observable.subscribe(subscriber);
//observable.subscribe(observer);
}

@Override
protected void onDestroy() {
super.onDestroy();
}
}simplestAty:
不完整回调ActionX类的使用方式,该类一共有Action0、Action1...到Action9,每个类对应不同的参数设置以及构造方法,FuncX方法同理。

/**
* subscribe()支持的不完整回调
* Created by jiangzn on 16/9/8.
*/
public class simplestAty extends Activity {
String tag = "xiaojingyu";
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d(tag,s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//错误处理
}
};
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
Log.d(tag,"completed");
}
};
@Override
public void onCreate(Bundle savedInstanceState, PersistableBundle persistentState) {
super.onCreate(savedInstanceState, persistentState);
Observable observable = Observable.just("hello");
observable.subscribe(onNextAction,onErrorAction,onCompleteAction);
}
}timer:
执行延时操作,在一些需要等待执行的场景中使用:

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
MyLog.d("completed");
}

@Override
public void onError(Throwable e) {
MyLog.d("error");
}

@Override
public void onNext(Long aLong) {
MyLog.d("啪啪啪!");
}
});interval:
做周期性操作,比如轮回显示什么的。

Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
MyLog.d("completed");
}

@Override
public void onError(Throwable e) {
MyLog.d("error");
}

@Override
public void onNext(Long aLong) {
MyLog.d("啪啪啪");
}
});twoexample:
两个实践制定线程方法的小栗子,重要的是如下两句:

subscribeOn(Schedulers.io()) //指定OnSubscribe被激活时处在的线程,事件产生线程

.observeOn(AndroidSchedulers.mainThread())  //Subscriber所运行的线程,事件消费的线程

记住subscribeOn和observeOn的区别和用法,特别重要!!这是RxJava实现异步操作的基础!

/**
* 两个小栗子
* a打印字符串数组
* b由id取得突破并显示 设置执行的线程
* Created by jiangzn on 16/9/8.
*/
public class twoexample extends Activity {
String tag = "xiaojingyu";
String[] names = {"a","b","c"};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.test2);
Log.d(tag,"MainActivity");
// a();
b();
}

private void b() {
final int drawableRes = R.drawable.aa;
final ImageView imageView = (ImageView) findViewById(R.id.iv_test2);
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Log.d(tag,"call:" + Thread.currentThread().getName());
//打印结果:call:RxCachedThreadScheduler-1...
Drawable drawable = getResources().getDrawable(drawableRes);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()) //指定OnSubscribe被激活时处在的线程,事件产生线程
.observeOn(AndroidSchedulers.mainThread()) //Subscriber所运行的线程,事件消费的线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
Log.d(tag,"Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag,e.getMessage());
}

@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
Log.d(tag,"加载线程:" + Thread.currentThread().getName());
//打印结果:加载线程:main
}
});
}
private void a() {
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(tag,s);
}
});
}
}schedulePeriodically:
轮询请求,在一些需要反复需要获取数据的场景,比如过几十分钟更新一下天气数据之类的。

Observable.create(new Observable.OnSubscribe<String>(
) {
@Override
public void call(final Subscriber<? super String> subscriber) {
Schedulers.io().createWorker() //指定在io线程执行
.schedulePeriodically(new Action0() {
@Override
public void call() {
subscriber.onNext("doNetworkCallAndGetStringResult");
}
}, 2000, 1000, TimeUnit.MILLISECONDS);//初始延迟,polling延迟
}

})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
MyLog.d("polling..." + s);
}
});
merge:
简而言之就是合并操作

稍微复杂一点说就是合并两个数据源的数据,但是不保证顺序,这个顺序只能是事件产生的顺序。

/**
* Merge
* 使用merge合并两个数据源
* 例如一组数据来自网络,一组数据来自文件,需要合并数据并一起显示的情况
*
* 可以理解为拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
* Created by jiangzn on 16/9/8.
*/
public class merge extends Activity {
String tag = "xiaojingyu";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.test3);
Observable.merge(getDataFromFile() ,getDataFromNet())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(tag,"done loading all data");
}

@Override
public void onError(Throwable e) {
Log.d(tag,"error");
}

@Override
public void onNext(String s) {
Log.d(tag,"merge:" + s);
}
});
}
private Observable<String> getDataFromFile() {
String[] strs = {"filedata1","filedata2","filedata3","filedata4"};
Observable<String> temp = Observable.from(strs);
return temp;
}

private Observable<String> getDataFromNet() {
String[] strs = {"netdata1","netdata2","netdata3","netdata4"};
Observable<String> temp = Observable.from(strs);
return temp;
}
}map_flatmap:
代码比较直观,就是比如传入一个路径给我返回一个bitmap。

map:

Observable.just("images/logo.png") //输入类型
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filepath) {
return getBitmapFromPath(filepath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
showBitmap(bitmap);
}
});
flatmap:

Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
MyLog.d(student.name);
return Observable.from(student.getCourses());
}
}).subscribe(subscriber);foreach:
实现对数组或者list的遍历。在需要异步执行的情况下还是比for循环好用的。
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
MyLog.d(s);
}
});lifecycle:
使用一个compositesubscription来管理我们的observable,以防止内存泄漏。

public class lifecycle extends Activity{

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);

}

private CompositeSubscription mCompositeSubscription
= new CompositeSubscription();

private void doSomething() {
mCompositeSubscription.add( Observable.just("Hello, World!")
.subscribe(new Action1<String>(){
@Override
public void call(String s) {
MyLog.d(s);
}
}));
}

@Override
protected void onDestroy() {
super.onDestroy();
mCompositeSubscription.unsubscribe();
//注意! 一旦你调用了 CompositeSubscription.unsubscribe(),
// 这个CompositeSubscription对象就不可用了, 如果你还想使用CompositeSubscription,
// 就必须在创建一个新的对象了。
}
}


最后看完如果还不够过瘾的话,可以从Github上下载下来然后运行跑一跑体验一下哦~
这里是传送门:点我送屠龙宝刀

求start~求fork~求watch~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息