您的位置:首页 > 编程语言 > Java开发

RxJava2 基础教程(一)HelloWorld

2017-02-05 22:59 316 查看

RxJava2基础使用

RxJava2基础使用
初学者引导已经学习过RxJava的可跳过

观察者源码解析

Rxjava2创建Observable和Observer

Observable 多种创建方式区别 和 subscribeObserver的简化

终极学习教程 勤动手谁看都懂不练无用

初学者引导,已经学习过RxJava的可跳过

响应式编程主要目的是方便我们对复杂需求的编程的一种简化思想(个人理解),例如 Android中的异步任务,主线程更新UI操作,Event 总线等等。

先从理解和使用 java 的 观察者模式 开始入手,java提供一个类 java.util.Observable(敌方) 和 接口 Observer(侦察兵)

定义一个类 AObserver 实现接口 Observer 实现方法 update(Observable o, Object arg){...}

实例化 AObserver 和 Observable
java.util.Observable observable = new java.util.Observable();
AObserver observer=new AObserver();
//订阅
observable.addObserver(observer);
//发送事件
observable.notifyObservers(Object o);
//取消订阅
observable.deleteObserver(observer);

这样我们的侦察兵就可以准确的观察到敌方动向,而订阅是他们主要的关联,你可以理解为 望远镜。


观察者源码解析

//先看下订阅是怎么做到的
private final ArrayList<Observer> observers;
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!observers.contains(o)) {
observers.add(o);
}
}
你肯定大喊卧槽!! 就是加集合~没错
//再看看 发送事件
.....
//核心代码
for (int i = arrLocal.length-1; i>=0; i--)
arrLocal[i].update(this, arg);
唔。其实就是这样遍历接口集合调用方法

for(Observer observer:observers){
observer.update(this,arg);
}


Rxjava2创建Observable和Observer

Observable 有很多种方式创建

1. Observable.create(new ObservableOnSubscribe<Object>()[])
2. Observable.just(.....)
3. Observable.fromArray(T...items)
4. Observable.fromCallable(new Callable<Objecet>() {})
5. ..其他自己摸索下


他们的区别。别急 为了更好的理解我们先动手完成一次事件发送,成就感会带起你学习兴趣

Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe" + d.toString());
}

@Override
public void onNext(String value) {
Log.d(TAG, "onNext" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError" + e.toString());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
e.onComplete();
//e.onError(Throwable);
}

// 这里的 onNext 可以理解为察者模式中的 notifyObservers() 而 onComplete()就是完成了发送 截断 发送,如果没有调用此方法 则可继续使用 onNext。onError 就是异常,也会截断后续发送,特别注意 onComplete 和 onError 互斥 不能同时使用。

//订阅 subscribe 等同于 addObserver
observable.subscribe(observer);

收到日志
D/MainActivity: onSubscribe: null
D/MainActivity: onNext: A
D/MainActivity: onNext: B
D/MainActivity: onComplete


Observable 多种创建方式区别 和 subscribe(Observer)的简化

1. Observable.create(new ObservableOnSubscribe<Object>()[])
上面的例子中有不过多解释了
2. Observable.just(.....)

Observable.just("A", "B") 这样创建的结果同上面的例子;

3. Observable.fromArray(T...items)

just 和 fromArray 用法相似,区别是 参数

4. Observable.fromCallable(new Callable<Objecet>() {})

Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "A";
}

看下subscribe()简化 如果我们只关注 onNext 结果可以写成这样

subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//接收到的就是onNext发送的
}
});


如果要关注 onComplete 就需要这样了,我们拿参数最长的解析下,那么其他就自然懂了

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)

参数 Consumer<? super T> 就是 onNext 接收器
参数 Consumer<? super Throwable> onError接收器
... 巴拉巴拉。。。其他都不用说啊 看参数的名字都懂了

写法就如下这样:

getObservable().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//onNext
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//onError
}
}, new Action() {
@Override
public void run() throws Exception {
//onComplete
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
//onSubscribe
}
});


终极学习教程 ~ 勤动手,(谁看都懂,不练无用)

另外推荐Rxjava2学习博客:

给初学者的RxJava2.0教程
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  响应式 rxjava2 android