RXJAVA初接触
2016-07-18 18:07
471 查看
一天从同事那边听到了Rxjava,突然发觉自己离这个时代好遥远,这样一个诞生几年的技术,居然第一听,接下来的时间里,打算花点时间了解了下它的基本情况,社区网站是 reactivex.io。
Rx提供了可观察数据流进行异步的编程模型(观察者模式,我们部分系统,需要保证最终一致性的,就是通过这个设计模式达到的,请求失败继续加到观察者里面,直到成功)
编程模型,一看它的map,flatMap的函数有种hadoop和spark的感觉,还是老老实实的从基础开始看起吧。
RxJava两个基本对象Observables和Subscribers。Observables可以理解为页面的按钮,Subscribers可以理解为绑定这个按钮的函数,比如按钮被点击了一下调Subscribers
当然一切从hello word开始
dependencies {
testCompile group:
'junit',
name:
'junit',
version:
'4.11'
// https://mvnrepository.com/artifact/io.reactivex/rxjava
compile
group:
'io.reactivex',
name:
'rxjava',
version:
'1.1.7'
}
public class RXDemo { public static void main(String[] args) throws IOException { //创建Observable Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { sub.onNext("Hello world!"); } } ); //创建Subscriber Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println("--"+s); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; //组合 myObservable.subscribe(mySubscriber); } }
跟踪myObservable.subscribe(mySubscriber)
方法
发现比较核心的一个方法调用,黄色标注
if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) {
继续跟踪发现
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
很明显核心类通过observable.onSubscribe的对象就是上文new
Observable.OnSubscribe的对象
调用他内部的回调函数call,call方法通过sub.onNext("Hello
world!")方法把参数传给Subscriber并调用他的onNext方法最终得到结果
@Override public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwOrReport(e, this); } }
打印结果
--Hello world!
整个看起来行数很多,
可以利用java8和Rxjava提供的一些简便的操作上面的代码可以缩减为
public static void main(String[] args) throws IOException { Observable.just("Hello world!") .subscribe(s -> System.out.println("---next"+s),s-> System.out.println("--error"+s),()-> System.out.println("--onComplete")); }
打印结果
---nextHello world! --onComplete
本文例子https://github.com/springsmile/rxjavademo
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树