您的位置:首页 > 编程语言 > Java开发

Rxjava(创建类)--Repeat

2016-10-25 14:36 176 查看
创建一个发射特定数据重复多次的Observable



demo

     Observable.range(1, 5).repeat(5).subscribe(new Action1<Integer>() {

            @Override

            public void call(Integer integer) {

                System.out.println("-------->" + integer);

            }

        });

输出:

我们看一下repeat函数

  public final Observable<T> repeat(final long count) {

        return OnSubscribeRedo.<T>repeat(this, count);

    }

 public static <T> Observable<T> repeat(Observable<T> source, final long count) {

        return repeat(source, count, Schedulers.trampoline());

    }

public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {

        if(count == 0) {

            return Observable.empty();

        }

        if (count < 0) {

            throw new IllegalArgumentException("count >= 0 expected");

        }

        return repeat(source, new RedoFinite(count - 1), scheduler);

    }

这里创建了一个RedoFinite,传递的是count-1(重复执行count-1次)

        public RedoFinite(long count) {

            this.count = count;

        }

  public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {

        return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));

    }

创建了一个OnSubscribeRedo,notificationHandler就是我们前面创建的RedoFinite

    private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,

            Scheduler scheduler) {

        this.source = source;

        this.controlHandlerFunction = f;

        this.stopOnComplete = stopOnComplete;

        this.stopOnError = stopOnError;

        this.scheduler = scheduler;

    }



从这个图我们也可以看出各个之间的关系,虚线箭头表示属于,如OnSubscribeRange属于Observable中

有前面分析可知,下一步在subscribe的时候就调用OnSubscribeRedo的call函数

public void call(final Subscriber<? super T> child) {

        // when true is a marker to say we are ready to resubscribe to source

        final AtomicBoolean resumeBoundary = new AtomicBoolean(true);

        // incremented when requests are made, decremented when requests are fulfilled

        final AtomicLong consumerCapacity = new AtomicLong();

        final Scheduler.Worker worker = scheduler.createWorker();

        child.add(worker);

        final SerialSubscription sourceSubscriptions = new SerialSubscription();

        child.add(sourceSubscriptions);

        // use a subject to receive terminals (onCompleted and onError signals) from

        // the source observable. We use a BehaviorSubject because subscribeToSource

        // may emit a terminal before the restarts observable (transformed terminals)

        // is subscribed

        final Subject<Notification<?>, Notification<?>> terminals = BehaviorSubject.<Notification<?>>create().toSerialized();

        final Subscriber<Notification<?>> dummySubscriber = Subscribers.empty();

        // subscribe immediately so the last emission will be replayed to the next

        // subscriber (which is the one we care about)

        terminals.subscribe(dummySubscriber);

        final ProducerArbiter arbiter = new ProducerArbiter();

        final Action0 subscribeToSource = new Action0() {

            @Override

            public void call() {

                if (child.isUnsubscribed()) {

                    return;

                }

                Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {

                    boolean done;

                    @Override

                    public void onCompleted() {

                        if (!done) {

                            done = true;

                            unsubscribe();

                            terminals.onNext(Notification.createOnCompleted());

                        }

                    }

                    @Override

                    public void onError(Throwable e) {

                        if (!done) {

                            done = true;

                            unsubscribe();

                            terminals.onNext(Notification.createOnError(e));

                        }

                    }

                    @Override

                    public void onNext(T v) {

                        if (!done) {

                            child.onNext(v);

                            decrementConsumerCapacity();

                            arbiter.produced(1);

                        }

                    }

                    private void decrementConsumerCapacity() {

                        // use a CAS loop because we don't want to decrement the

                        // value if it is Long.MAX_VALUE

                        while (true) {

                            long cc = consumerCapacity.get();

                            if (cc != Long.MAX_VALUE) {

                                if (consumerCapacity.compareAndSet(cc, cc - 1)) {

                                    break;

                                }

                            } else {

                                break;

                            }

                        }

                    }

                    @Override

                    public void setProducer(Producer producer) {

                        arbiter.setProducer(producer);

                    }

                };

                // new subscription each time so if it unsubscribes itself it does not prevent retries

                // by unsubscribing the child subscription

                sourceSubscriptions.set(terminalDelegatingSubscriber);

                source.unsafeSubscribe(terminalDelegatingSubscriber);

            }

        };

        // the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'

        // type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert

        // the retry/repeat relevant values to the control handler

        final Observable<?> restarts = controlHandlerFunction.call(

                terminals.lift(new Operator<Notification<?>, Notification<?>>() {

                    @Override

                    public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {

                        return new Subscriber<Notification<?>>(filteredTerminals) {

                            @Override

                            public void onCompleted() {

                                filteredTerminals.onCompleted();

                            }

                            @Override

                            public void onError(Throwable e) {

                                filteredTerminals.onError(e);

                            }

                            @Override

                            public void onNext(Notification<?> t) {

                                if (t.isOnCompleted() && stopOnComplete) {

                                    filteredTerminals.onCompleted();

                                } else if (t.isOnError() && stopOnError) {

                                    filteredTerminals.onError(t.getThrowable());

                                } else {

                                    filteredTerminals.onNext(t);

                                }

                            }

                            @Override

                            public void setProducer(Producer producer) {

                                producer.request(Long.MAX_VALUE);

                            }

                        };

                    }

                }));

        // subscribe to the restarts observable to know when to schedule the next redo.

        worker.schedule(new Action0() {

            @Override

            public void call() {

                restarts.unsafeSubscribe(new Subscriber<Object>(child) {

                    @Override

                    public void onCompleted() {

                        child.onCompleted();

                    }

                    @Override

                    public void onError(Throwable e) {

                        child.onError(e);

                    }

                    @Override

                    public void onNext(Object t) {

                        if (!child.isUnsubscribed()) {

                            // perform a best endeavours check on consumerCapacity

                            // with the intent of only resubscribing immediately

                            // if there is outstanding capacity

                            if (consumerCapacity.get() > 0) {

                                worker.schedule(subscribeToSource);

                            } else {

                                // set this to true so that on next request

                                // subscribeToSource will be scheduled

                                resumeBoundary.compareAndSet(false, true);

                            }

                        }

                    }

                    @Override

                    public void setProducer(Producer producer) {

                        producer.request(Long.MAX_VALUE);

                    }

                });

            }

        });

        child.setProducer(new Producer() {

            @Override

            public void request(final long n) {

                if (n > 0) {

                    BackpressureUtils.getAndAddRequest(consumerCapacity, n);

                    arbiter.request(n);

                    if (resumeBoundary.compareAndSet(true, false)) {

                        worker.schedule(subscribeToSource);

                    }

                }

            }

        });

    }

1、创建了一个worker,用来后面执行任务

2、创建一个ProducerArbiter

3、创建了一个subscribeToSource,代表1中执行任务的具体动作

4、创建restarts被观察者,主要用来启动下一次循环操作

5、调用 worker.schedule订阅前面的restarts

6、调用设置demo中订阅者的Producer

1、2、3比较简单,我们先看4

final Observable<?> restarts = controlHandlerFunction.call(

                terminals.lift(new Operator<Notification<?>, Notification<?>>() {

                    @Override

                    public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {

                        return new Subscriber<Notification<?>>(filteredTerminals) {

                            @Override

                            public void onCompleted() {

                                filteredTerminals.onCompleted();

                            }

                            @Override

                            public void onError(Throwable e) {

                                filteredTerminals.onError(e);

                            }

                            @Override

                            public void onNext(Notification<?> t) {

                                if (t.isOnCompleted() && stopOnComplete) {

                                    filteredTerminals.onCompleted();

                                } else if (t.isOnError() && stopOnError) {

                                    filteredTerminals.onError(t.getThrowable());

                                } else {

                                    filteredTerminals.onNext(t);

                                }

                            }

                            @Override

                            public void setProducer(Producer producer) {

                                producer.request(Long.MAX_VALUE);

                            }

                        };

                    }

                }));

这一个赋值语句比较长,有前面的图可知controlHandlerFunction是RedoFinite,terminals是BehaviorSubject.<Notification<?>>create().toSerialized()

首先调用前面terminals的lift,这里有个Operator  



最终restarts是一个这样的

terminals.lift最终生成的Observable 



然后调用RedoFinite的call

这里对前面的OnSubscribeLift调用map函数

public Observable<?> call(Observable<? extends Notification<?>> ts) {

            return ts.map(new Func1<Notification<?>, Notification<?>>() {

                int num;

                @Override

                public Notification<?> call(Notification<?> terminalNotification) {

                    if(count == 0) {

                        return terminalNotification;

                    }

                    num++;

                    if(num <= count) {

                        return Notification.createOnNext(num);

                    } else {

                        return terminalNotification;

                    }

                }

            }).dematerialize();
        }



最后还有一个dematerialize

public final <T2> Observable<T2> dematerialize() {
return lift(OperatorDematerialize.instance());
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
最终我们的restarts是这样的   



在分析一下5调用 worker.schedule订阅前面的restarts

前面createWork创建的是InnerCurrentThreadScheduler

 @Override

        public Subscription schedule(Action0 action) {

            return enqueue(action, now());

        }

private Subscription enqueue(Action0 action, long execTime) {

            if (innerSubscription.isUnsubscribed()) {

                return Subscriptions.unsubscribed();

            }

            final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet());

            queue.add(timedAction);

            if (wip.getAndIncrement() == 0) {

                do {

                    final TimedAction polled = queue.poll();

                    if (polled != null) {

                        polled.action.call();

                    }

                } while (wip.decrementAndGet() > 0);

                return Subscriptions.unsubscribed();

            } else {

                // queue wasn't empty, a parent is already processing so we just add to the end of the queue

                return Subscriptions.create(new Action0() {

                    @Override

                    public void call() {

                        queue.remove(timedAction);

                    }

                });

            }

        }

最终会调用call函数

worker.schedule(new Action0() {

            @Override

            public void call() {

                restarts.unsafeSubscribe(new Subscriber<Object>(child) {

                    @Override

                    public void onCompleted() {

                        child.onCompleted();

                    }

                    @Override

                    public void onError(Throwable e) {

                        child.onError(e);

                    }

                    @Override

                    public void onNext(Object t) {

                        if (!child.isUnsubscribed()) {

                            // perform a best endeavours check on consumerCapacity

                            // with the intent of only resubscribing immediately

                            // if there is outstanding capacity

                            if (consumerCapacity.get() > 0) {

                                worker.schedule(subscribeToSource);

                            } else {

                                // set this to true so that on next request

                                // subscribeToSource will be scheduled

                                resumeBoundary.compareAndSet(false, true);

                            }

                        }

                    }

                    @Override

                    public void setProducer(Producer producer) {

                        producer.request(Long.MAX_VALUE);

                    }

                });

            }

        });

调用restarts.unsafeSubscribe
由前面图分析可知,这里先调用OnSubscribeLift的call函数

public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}


这里的operator是OperatorDematerialize,我们看一下它的call函数

public Subscriber<? super Notification<T>> call(final Subscriber<? super T> child) {
return new Subscriber<Notification<T>>(child) {
/** Do not send two onCompleted events. */
boolean terminated;
@Override
public void onNext(Notification<T> t) {
switch (t.getKind()) {
case OnNext:
if (!terminated) {
child.onNext(t.getValue());
}
break;
case OnError:
onError(t.getThrowable());
break;
case OnCompleted:
onCompleted();
break;
default:
onError(new IllegalArgumentException("Unsupported notification type: " + t));
break;
}
}

@Override
public void onError(Throwable e) {
if (!terminated) {
terminated = true;
child.onError(e);
}
}

@Override
public void onCompleted() {
if (!terminated) {
terminated = true;
child.onCompleted();
}
}

};
}


这里返回了一个订阅者,数据类型是Notification

回到call函数,接下来parent是OnSubscribeMap

@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}


这里以前面创建的订阅者和transformer为参数创建了一个MapSubscriber,最后调用unsafeSubscribe

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
unsafeSubscribe最终调用source的call函数,这里的source是OnSubscribeLift

public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
o是前面创建出来的MapSubscriber

operator是在最开始的OnSubscribeRedo的call函数创建的,如下

final Observable<?> restarts = controlHandlerFunction.call(
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
@Override
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
return new Subscriber<Notification<?>>(filteredTerminals) {
@Override
public void onCompleted() {
filteredTerminals.onCompleted();
}

@Override
public void onError(Throwable e) {
filteredTerminals.onError(e);
}

@Override
public void onNext(Notification<?> t) {
if (t.isOnCompleted() && stopOnComplete) {
filteredTerminals.onCompleted();
} else if (t.isOnError() && stopOnError) {
filteredTerminals.onError(t.getThrowable());
} else {
filteredTerminals.onNext(t);
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
};
}
}));
这里也是返回了一个订阅者,回到OnSubscribeLift的call函数,parent是SerializedSubject,调用call函数
public void call(Subscriber<? super R> child) {
actual.unsafeSubscribe(child);
}

actual是BehaviorSubject,它的onSubscribe是SubjectSubscriptionManager

所以调用SubjectSubscriptionManager的call

public void call(final Subscriber<? super T> child) {
SubjectObserver<T> bo = new SubjectObserver<T>(child);
addUnsubscriber(child, bo);
onStart.call(bo);
if (!child.isUnsubscribed()) {
if (add(bo) && child.isUnsubscribed()) {
remove(bo);
}
}
}

注意这里的child是前面创建的一个订阅者
这里以child为参数创建了一个SubjectObserver

这里最主要的是调用add(bo)添加到State中

boolean add(SubjectObserver<T> o) {
do {
State oldState = get();
if (oldState.terminated) {
onTerminated.call(o);
return false;
}
State newState = oldState.add(o);
if (compareAndSet(oldState, newState)) {
onAdded.call(o);
return true;
}
} while (true);
}这样第五步就分析完了,我们再分析一下
6、调用设置demo中订阅者的Producer

child.setProducer(new Producer() {

@Override
public void request(final long n) {
if (n > 0) {
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
arbiter.request(n);
if (resumeBoundary.compareAndSet(true, false)) {
worker.schedule(subscribeToSource);
}
}
}
});

这里最终会调用request方法
arbiter是ProducerArbiter

public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
if (n == 0) {
return;
}
synchronized (this) {
if (emitting) {
missedRequested += n;
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
long r = requested;
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
requested = u;

Producer p = currentProducer;
if (p != null) {
p.request(n);
}

emitLoop();
skipFinal = true;
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}这里主要就是设置了requested

回到前面5,接着调用
worker.schedule(subscribeToSource);

这里最终会调用subscribeToSource的call函数
final Action0 subscribeToSource = new Action0() {
@Override
public void call() {
if (child.isUnsubscribed()) {
return;
}

Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
boolean done;

@Override
public void onCompleted() {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
}
}

@Override
public void onError(Throwable e) {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnError(e));
}
}

@Override
public void onNext(T v) {
if (!done) {
child.onNext(v);
decrementConsumerCapacity();
arbiter.produced(1);
}
}

private void decrementConsumerCapacity() {
// use a CAS loop because we don't want to decrement the
// value if it is Long.MAX_VALUE
while (true) {
long cc = consumerCapacity.get();
if (cc != Long.MAX_VALUE) {
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
break;
}
} else {
break;
}
}
}

@Override
public void setProducer(Producer producer) {
arbiter.setProducer(producer);
}
};
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
sourceSubscriptions.set(terminalDelegatingSubscriber);
source.unsafeSubscribe(terminalDelegatingSubscriber);
}
};

这里创建了一个terminalDelegatingSubscriber的订阅者,并调用
source.unsafeSubscribe(terminalDelegatingSubscriber);这里的source的onSubscribe是OnSubscribeRange

最终调用它的call
@Override
public void call(final Subscriber<? super Integer> childSubscriber) {
childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex));
}
创建了一个RangeProducer
RangeProducer(Subscriber<? super Integer> childSubscriber, int startIndex, int endIndex) {
this.childSubscriber = childSubscriber;
this.currentIndex = startIndex;
this.endOfRange = endIndex;
}
childSubscriber就是我们前面刚创建的
call中setProducer最终会调用RangeProducer的call方法

public void request(long requestedAmount) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
// fast-path without backpressure
fastPath();
} else if (requestedAmount > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
if (c == 0L) {
// backpressure is requested
slowPath(requestedAmount);
}
}
}

这里调用fastPath
void fastPath() {
final long endIndex = this.endOfRange + 1L;
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
for (long index = currentIndex; index != endIndex; index++) {
if (childSubscriber.isUnsubscribed()) {
return;
}
childSubscriber.onNext((int) index);
}
if (!childSubscriber.isUnsubscribed()) {
childSubscriber.onCompleted();
}
}childSubscriber就是我们前面刚创建出来的订阅者,调用它的onNext
public void onNext(T v) {
if (!done) {
child.onNext(v);
decrementConsumerCapacity();
arbiter.produced(1);
}
}child是最开始的类型为SafeSubscriber的订阅者,这里最终会调用到我们demo中的onNext方法
arbiter.produced(1);这段代码会把missedProduced+1
继续回到fastPath,会把下面的几次循环走完,最终调用onCompleted

public void onCompleted() {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
}
}
将done设置为true,取消订阅,并调用
terminals.onNext(Notification.createOnCompleted());terminals是SerializedSubject
public void onNext(T t) {
observer.onNext(t);
}
最终会调用到BehaviorSubject的onNext
public void onNext(T v) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.next(v);
for (SubjectObserver<T> bo : state.next(n)) {
bo.emitNext(n);
}
}
}

这里SubjectObserver的actual有一个是OnSubscribeRedo中创建的。
void emitNext(Object n) {
if (!fastPath) {
synchronized (this) {
first = false;
if (emitting) {
if (queue == null) {
queue = new ArrayList<Object>();
}
queue.add(n);
return;
}
}
fastPath = true;
}
NotificationLite.accept(actual, n);
}这里如果已经有数据在发送就会将数据如队列返回,没有则将fastPath设置为true,调用
NotificationLite.accept(actual, n);

public static <T> boolean accept(Observer<? super T> o, Object n) {
if (n == ON_COMPLETED_SENTINEL) {
o.onCompleted();
return true;
} else if (n == ON_NEXT_NULL_SENTINEL) {
o.onNext(null);
return false;
} else if (n != null) {
if (n.getClass() == OnErrorSentinel.class) {
o.onError(((OnErrorSentinel) n).e);
return true;
}
o.onNext((T) n);
return false;
} else {
throw new IllegalArgumentException("The lite notification can not be null");
}
}


这里的n是ON_COMPLETED,走最后一个else if分支
public void onNext(Notification<?> t) {
if (t.isOnCompleted() && stopOnComplete) {
filteredTerminals.onCompleted();
} else if (t.isOnError() && stopOnError) {
filteredTerminals.onError(t.getThrowable());
} else {
filteredTerminals.onNext(t);
}
}这是我们在OnSubscribeRedo中的call中创建的
这里会走else分支,filteredTerminals就是我们前面创建的MapSubscriber

@Override
public void onNext(T t) {
R result;

try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}

actual.onNext(result);
}

这里的mapper可以回到前面的图看下,call最终调用到
public Notification<?> call(Notification<?> terminalNotification) {
if(count == 0) {
return terminalNotification;
}

num++;
if(num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}

根据num值,决定下一步,这里num为1,count为4,走if分支,创建了一个OnNext
回到MapSubscriber的onNext,接着调用

actual.onNext(result);
actual是前面创建出来的OperatorDematerialize的call中new出来的订阅者

public void onNext(Notification<T> t) {
switch (t.getKind()) {
case OnNext:
if (!terminated) {
child.onNext(t.getValue());
}
break;
case OnError:
onError(t.getThrowable());
break;
case OnCompleted:
onCompleted();
break;
default:
onError(new IllegalArgumentException("Unsupported notification type: " + t));
break;
}
}这里走onNext分支
child是OnSubscribeRedo中创建出来的订阅者

public void onNext(Object t) {
if (!child.isUnsubscribed()) {
// perform a best endeavours check on consumerCapacity
// with the intent of only resubscribing immediately
// if there is outstanding capacity
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
} else {
// set this to true so that on next request
// subscribeToSource will be scheduled
resumeBoundary.compareAndSet(false, true);
}
}
}最终调用
worker.schedule(subscribeToSource);又回到了前面的循环

我们看一下RedoFinite的call的mapper达到最后的情景

public Observable<?> call(Observable<? extends Notification<?>> ts) {
return ts.map(new Func1<Notification<?>, Notification<?>>() {

int num;

@Override
public Notification<?> call(Notification<?> terminalNotification) {
if(count == 0) {
return terminalNotification;
}

num++;
if(num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}

}).dematerialize();这里num>count,走else分支
返回OnCompleted

public void onNext(Notification<T> t) {
switch (t.getKind()) {
case OnNext:
if (!terminated) {
child.onNext(t.getValue());
}
break;
case OnError:
onError(t.getThrowable());
break;
case OnCompleted:
onCompleted();
break;
default:
onError(new IllegalArgumentException("Unsupported notification type: " + t));
break;
}
}

这里走onCompleted分支
public void onCompleted() {
if (!terminated) {
terminated = true;
child.onCompleted();
}
}
public void onCompleted() {
child.onCompleted();
}
这里的child是我们demo中的订阅者的一个包裹,这样,整个流程就完成了

我们可以看到,repeate整个流程比较负责,主要是里面新建了很多的订阅者和观察者,所有函数调用 回调比较多。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: