您的位置:首页 > 编程语言 > Java开发

Rxjava2源码分析(一):Flowable的创建和基本使用过程分析

2017-05-03 16:27 651 查看
本文用于记录一下自己学习Rxjava2源码的过程。
首先是最简单的一个使用方法(未做线程切换),用来学习Flowable的创建和使用。
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {

}
}, BackpressureStrategy.ERROR)
.subscribe(new FlowableSubscriber<Object>() {
@Override
public void onSubscribe(@NonNull Subscription s) {

}

@Override
public void onNext(Object o) {

}

@Override
public void onError(Throwable t) {

}

@Override
public void onComplete() {

}
});
一、先看下create这个方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
1、首先看其参数,其参数为FlowableOnSubscribe和BackpressureStrategy
FlowableOnSubscribe为一个接口,里面只有一个subscribe方法,该方法参数为FlowableEmitter
public interface FlowableOnSubscribe<T> {

void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
public interface FlowableEmitter<T> extends Emitter<T> {

void setDisposable(@Nullable Disposable s);

void setCancellable(@Nullable Cancellable c);

long requested();

boolean isCancelled();
FlowableEmitter<T> serialize();
}
FlowableEmitter是一个接口,继承自Emitter接口
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter接口里面的方法是不是很熟悉,这3个方法就是对数据的回调处理了。
因此create方法的第一个参数大概作用就可以知道了,创建一个匿名对象,这个对象处理数据供回调操作。
BackpressureStrategy这是一个枚举作用后面再说。
2、再来看create里面的代码,前两句用来判断参数是否为空,主要看最后一句
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
先看onAssembly这个方法的参数,new了一个FlowableCreate对象,传入了create的参数,看一下FlowableCreate这个类,继承自Flwable,其构造方法保存了FlowableOnSubscribe、BackpressureStrategy这两个对象
public final class FlowableCreate<T> extends Flowable<T> {final FlowableOnSubscribe<T> source;final BackpressureStrategy backpressure;public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {this.source = source;this.backpressure = backpressure;}
}
接着进入onAssembly方法
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) {Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly;if (f != null) {return apply(f, source);}return source;}
这里先去获取了Function对象f,但是这时候f为null,所以直接反回了source,也就是刚刚new的FlowableCreate对象,保存了FlowableOnSubscribe、BackpressureStrategy这两个对象,到此为止create结束。
二、接着看subscribe方法,先看参数,这里new了个FlowableSubscriber对象
public interface FlowableSubscriber<T> extends Subscriber<T>
FlowableSubscriber为一个接口,继承自Subscriber,Subscriber也是一个接口,里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
    public void onComplete();}
里面的方法是不是和Emitter里的方法很像,不过现在他们还没有关系,接着往下看,进入subscribe方法里面
public final void subscribe(FlowableSubscriber<? super T> s) {ObjectHelper.requireNonNull(s, "s is null");try {Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");subscribeActual(z);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);RxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}
主要看try里的代码
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");subscribeActual(z);
1、第一句调用了onSubscribe方法,进入方法里面
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;if (f != null) {return apply(f, source, subscriber);}return subscribe:;}
该方法传入两个参数Flowable、Subscriber,调用该方法时传入传入的是this(即create创建出来的FlowableCreate对象),s(即subscribe方法new的FlowableSubscriber对象)
该方法内部BiFunction对象f为null所以直接反回了FlowableSubscriber对象。
所以第一句代码主要作用就是把FlowableSubscriber对象赋值给z
2、第二句检查z是否为null
3、这句代码调用了Flowable中的
protected abstract void subscribeActual(Subscriber<? super T> s);
这个方法,这是个抽象方法。但是由于create方法,此时的Flowable对象是他的子类FlowableCreate
进入FlowableCreate类里面的subscribeActual方法
public void subscribeActual(Subscriber<? super T> t) {BaseEmitter<T> emitter;switch (backpressure) {case MISSING: {emitter = new MissingEmitter<T>(t);break;}case ERROR: {emitter = new ErrorAsyncEmitter<T>(t);break;}case DROP: {emitter = new DropAsyncEmitter<T>(t);break;}case LATEST: {emitter = new LatestAsyncEmitter<T>(t);break;}default: {emitter = new BufferAsyncEmitter<T>(t, bufferSize());break;}}t.onSubscribe(emitter);try {source.subscribe(emitter);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);emitter.onError(ex);}}
此时枚举对象BackpressureStrategy派上用场了,根据枚举类型创建不同子类的BaseEmitter对象,这里以ErrorAsyncEmitter为例。
ErrorAsyncEmitter(Subscriber<? super T> actual) {super(actual);}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {private static final long serialVersionUID = 4127754106204442833L;NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {super(actual);}@Overridepublic final void onNext(T t) {if (isCancelled()) {return;}if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (get() != 0) {actual.onNext(t);BackpressureHelper.produced(this, 1);} else {onOverflow();}}abstract void onOverflow();}
这个类里面有个onNext方法,把参数赋给了Subscriber的onNext方法,这里就关联上了onNext方法。
implements FlowableEmitter<T>, Subscription {private static final long serialVersionUID = 7326289992464377023L;final Subscriber<? super T> actual;final SequentialDisposable serial;BaseEmitter(Subscriber<? super T> actual) {this.actual = actual;this.serial = new SequentialDisposable();}@Overridepublic void onComplete() {if (isCancelled()) {return;}try {actual.onComplete();} finally {serial.dispose();}}@Overridepublic void onError(Throwable e) {if (e == null) {e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (isCancelled()) {RxJavaPlugins.onError(e);return;}try {actual.onError(e);} finally {serial.dispose();}}@Overridepublic final void cancel() {serial.dispose();onUnsubscribed();}void onUnsubscribed() {// default is no-op}@Overridepublic final boolean isCancelled() {return serial.isDisposed();}@Overridepublic final void request(long n) {if (SubscriptionHelper.validate(n)) {BackpressureHelper.add(this, n);onRequested();}}void onRequested() {// default is no-op}@Overridepublic final void setDisposable(Disposable s) {serial.update(s);}@Overridepublic final void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic final long requested() {return get();}@Overridepublic final FlowableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}}
最终到了BaseEmitter这个了,该类实现了FlowableEmitter这个接口
这个类保存了FlowableSubscriber对象,同时可以看到onError、onComplete这两个方法,此时Emitter和Subscriber的方法关联完毕。onError、onComplete这两个方法是互斥的,所以调用一个后另一个就不执行,具体的过程这里不分析。
继续看这句代码onSubscribet.onSubscribe(emitter);把emitter作为参数传给了onSubscribe用于回调
最后这句代码:
source.subscribe(emitter);
source为最开始create的参数,即我们new的FlowableOnSubscribe对象。这句代码把emitter作为参数传给了FlowableOnSubscribe对象。
整个过程完成了
总结起来步骤为:
1、create方法创建了FlowableCreate对象,这个对象保存了一个FlowableOnSubscribe对象,FlowableOnSubscribe包含了一个subscribe(@NonNull FlowableEmitter<T> e)方法,该方法调用Emitter里的onNext、onError、onComplete方法。
2、subscribe方法创建了一个Emitter对象,通过这个对象关联了FlowableOnSubscribe对象和FlowableSubscriber对象,在FlowableOnSubscribe的subscribe方法中获取数据后调用emitter的方法把数据传递给FlowableSubscriber,用于处理结果。

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: