您的位置:首页 > 编程语言 > Java开发

RXJAVA初接触

2016-07-18 18:07 471 查看
  
一天从同事那边听到了Rxjava,突然发觉自己离这个时代好遥远,这样一个诞生几年的技术,居然第一听,接下来的时间里,打算花点时间了解了下它的基本情况,社区网站是 reactivex.io。

         Rx提供了可观察数据流进行异步的编程模型(观察者模式,我们部分系统,需要保证最终一致性的,就是通过这个设计模式达到的,请求失败继续加到观察者里面,直到成功)

编程模型,一看它的map,flatMap的函数有种hadoop和spark的感觉,还是老老实实的从基础开始看起吧。

         RxJava两个基本对象Observables和Subscribers。Observables可以理解为页面的按钮,Subscribers可以理解为绑定这个按钮的函数,比如按钮被点击了一下调Subscribers

 

当然一切从hello word开始

dependencies {

    testCompile group:
'junit',
name:
'junit',
version:
'4.11'

    // https://mvnrepository.com/artifact/io.reactivex/rxjava
    compile
group:
'io.reactivex',
name:
'rxjava',
version:
'1.1.7'
}

 
public class RXDemo {
    public static void main(String[] args) throws IOException {
        //创建Observable
        Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> sub) {
                        sub.onNext("Hello  world!");
                    }
                }
        );

        //创建Subscriber
        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) { System.out.println("--"+s); }

            @Override
            public void onCompleted() { }

            @Override
            public void onError(Throwable e) { }
        };

        //组合
        myObservable.subscribe(mySubscriber);
    }
}


 

跟踪myObservable.subscribe(mySubscriber)

方法
发现比较核心的一个方法调用,黄色标注
if (!(subscriber instanceof SafeSubscriber)) {
    // assign to `observer` so we return the protected version
    subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
    // allow the hook to intercept and/or decorate
    RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {


 

继续跟踪发现
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);


 

很明显核心类通过observable.onSubscribe的对象就是上文new
Observable.OnSubscribe的对象

调用他内部的回调函数call,call方法通过sub.onNext("Hello 
world!")方法把参数传给Subscriber并调用他的onNext方法最终得到结果
@Override
public void onNext(T args) {
    try {
        if (!done) {
            actual.onNext(args);
        }
    } catch (Throwable e) {
        // we handle here instead of another method so we don't add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwOrReport(e, this);
    }
}


 

打印结果
--Hello  world!


 

整个看起来行数很多,

可以利用java8和Rxjava提供的一些简便的操作上面的代码可以缩减为

 
public static void main(String[] args) throws IOException {
 Observable.just("Hello  world!")
        .subscribe(s -> System.out.println("---next"+s),s-> System.out.println("--error"+s),()-> System.out.println("--onComplete"));
}


打印结果
---nextHello  world!
--onComplete


 



本文例子https://github.com/springsmile/rxjavademo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java rxjava