您的位置:首页 > 编程语言 > Java开发

RxJava学习(三)Rxjava2.0 中几种观察者模式

2017-03-02 17:06 429 查看

1 几种观察者模式

1 Observable/Observer

2 Flowable/Subscriber

3 Single/SingleObserver

4 Completable/CompletableObserver

5 Maybe/MaybeObserver

2 被观察者的上层接口

Observable接口

interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}


Single接口

interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}


Completable接口

interface CompletableSource {
void subscribe(CompletableObserver observer);
}


Maybe接口

interface MaybeSource {

void subscribe(MaybeObserver

public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}


我们可以看到,每一种被观察者都继承自各自的接口,这也就把他们能完全的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操作符不会相互影响。

3 观察者

4.Observable/Observer 举例学习

Observable/Observer

被观察者:

Observable mObservable=Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext(1);

e.onNext(2);

e.onComplete();

}

});

观察者:

public interface Observer {

void onSubscribe(Disposable d);

void onNext(T value);

void onError(Throwable e);

void onComplete();

}

public interface Disposable {

/**

* Dispose the resource, the operation should be idempotent.

*/

void dispose();

/**

* Returns true if this resource has been disposed.

* @return true if this resource has been disposed

*/

boolean isDisposed();

}

在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差不多的,只不过命名不一致,大家知道就好。

观察者模式调用

Observable mObservable=Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext(1);

e.onNext(2);

e.onComplete();

}

});

Observer mObserver=new Observer() {

//这是新加入的方法,在订阅后发送数据之前,

//回首先调用这个方法,而Disposable可用于取消订阅

@Override

public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer value) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};


mObservable.subscribe(mObserver);

5.Flowable/Subscriber 举例学习

Flowable/Subscriber

被观察者:

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

e.onNext(1);

e.onNext(2);

e.onNext(3);

e.onNext(4);

e.onComplete();

}

}

//需要指定背压策略

, BackpressureStrategy.BUFFER);

观察者:

public interface Subscriber {

public void onSubscribe(Subscription s);

public void onNext(T t);

public void onError(Throwable t);

public void onComplete();

}

public interface Subscription {

public void request(long n);

public void cancel();

}

上面的实例中,onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,当然,他也可以用于请求上游的数据。

观察者模式调用:

Flowable.range(0,10)

.subscribe(new Subscriber() {

Subscription sub;

//当订阅后,会首先调用这个方法,其实就相当于onStart(),

//传入的Subscription s参数可以用于请求数据或者取消订阅

@Override

public void onSubscribe(Subscription s) {

Log.w(“TAG”,”onsubscribe start”);

sub=s;

sub.request(1);

Log.w(“TAG”,”onsubscribe end”);

}

@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});


输出如下:

onsubscribe start

onNext—>0

onNext—>1

onNext—>2



onNext—>10

onComplete

onsubscribe end

Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: