您的位置:首页 > 编程语言 > Java开发

一张图搞定-RxJava2的线程切换原理和内存泄露问题

2017-06-11 17:01 507 查看

一张图搞定-RxJava2的线程切换原理和内存泄露问题分析

首先祭出我自己画的一张图



这张图显示的是RxJava2源码层面上的调用关系

下面通过一个例子来解释一下这张图

public class MainActivity extends Activity {
private static final String TAG = "MainActivity";

CompositeDisposable comDisposable = new CompositeDisposable();
Bitmap bitmap;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG,s);
}
});
comDisposable.add(disposable);
bitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.aaa);
}

@Override
protected void onDestroy() {
super.onDestroy();
//comDisposable.dispose();
}
}


这个例子很简单,在io线程里发送一个字符串,然后在主线程中打印出来.这里面我们加了一个bitmap对象,这个对象里面保持了一个大图片,我们通过观察内存占用,

就可以分析此Activity是否内存泄露,没有被垃圾回收.

首先从Observable.create方法开始分析

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}


生成了一个ObservableCreate对象,此对象中传入了一个ObservableOnSubscribe对象,而这个ObservableOnSubscribe对象是我们实现的一个内部类

new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
}


这个ObservableOnSubscribe的构造方法定义为

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}


将ObservableOnSubscribe对象赋值给了source,在图中可以看到ObservableCreate里面的source的颜色和ObservableOnSubscribe的颜色相同都是黄色,这个

图中,我特意将相同的对象置为了相同的颜色,从ObservableCreate –> ObservableSubscribeOn –> ObservableObserveOn,从上到下,依次持有上层的对象,用的是装饰者模式,像极了Java的流对象.

ObservableSubscribeOn是执行subscribeOn方法生成的,代码如下

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}


ObservableObserveOn是执行observeOn方法生成的,代码如下

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}


也就是下面这段代码最终得到了一个ObservableObserveOn对象

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());


OK,这就是图的左半边的对象生成逻辑,也许你会问每一个对象里的source是干什么用的?,别着急接下来你就会明白了.

来看subscribe操作,代码如下

Disposable disposable = observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG,s);
}
});


这个subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里

public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}


其实这个subscribe里是可以传四个参数的分别是onNext,onError,onComplete,onSubscribe,我们这里只传了一个onNext,其他操作源码传入了默认操作.

接着向里面跟源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}


可以看到new了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe.之后调用subscribe方法,

此subscribe方法是ObservableObserveOn对象的,最终会调用ObservableObserveOn的subscribeActual方法,代码如下:

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}


让我们结合图来分析一下,这个方法调用的是source.subscribe并传入一个ObserveOnObserver对象



从图中我们可以看到ObservableObserveOn对象里的source是绿色的,是一个ObservableSubscribeOn对象,调用其subscribe方法并传入一个ObserveOnObserver对象(图中浅绿色),这个ObserveOnOnserver对象中有一个actual对象,此对象为LambdaObserver(图中紫色),从上面的分析中我们知道LambdaObserver对象中保存着我们的定义的onNext操作.

注意,接下来比较重要,要出现线程切换了:

因为调用了ObservableSubscribeOn的subscribe方法,最终会调用subscribeActual方法,代码如下

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}


我们在知道ObservableSubscribeOn对象时调用subscribeOn(Schedulers.io())方法是生成的对象,上面的scheduler对象就是Schedulers.io()生成的一个线程调度对象,此对象是维护这一个线程池,让操作在io线程池中执行(关于io线程池,大家可以自己百度:) ),也就是此方法 scheduler.scheduleDirect(new SubscribeTask(parent))会切换线程来执行SubscribeTask任务,此任务的定义为:

final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}


其中parent就是一个SubscribeOnObserver对象,soure就是一个ObservableCreate对象,如图:



SubscribeOnObserver对象中也有一个actual对象(浅绿色),从图中可看到是从下面传上来的一个ObserveOnObserver对象.

执行source.subscribe(parent) (也就是ObservableCreate.subscribe(ObserveOnObserver))时,是在io线程中进行的,如图所示切换了线程.这里其实可以解释一个问题 subscribeOn如果执行多次为什么只有第一次有作用?,每一次subscribeOn都会切换线程,从图中我们可以看到,这个切换线程是倒着的从下往上,也就是一直切换到最后一个subscribeOn,其实就是我们代码中定义的第一个subscribeOn,也许你会问,从下往上是subscribeOn切换线程,那如果要调用我们自己定义的onNext,最终不是还要再从上往下调用回去,调用回去时还会切换线程么?别着急,一会就会讲到从上往下调用回去的逻辑,这里我可以先说一个概括 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程

接着往下,在io线程里调用ObservableCreate.subscribe(ObserveOnObserver),最终同样调用到subscribeActual方法,代码如下

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}


同样的调用了source.subscribe方法,这里的source是ObservableOnSubscribe对象,parent是一个CreateEmitter对象,如下图:



从图中可以看到CreateEmitter中有一个observer对象(深蓝色),此对象是传上来的SubscribeOnObserver对象,Ok终于调到头了,最终调用ObservableOnSubscribe的subscribe方法,这个ObservableOnSubscribe就是我们在Activity中自己实现的一个匿名内部类:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})


豁然开朗有木有,终于到了数据产生的源头了,并且这个方法是在io线程里执行的,准确的说是第一个subscribeOn方法所指定的线程,这里我们只分析onNext方法,其他onError,onComplete分析方法是相同的.

  接着来分析一下onNext方法的传递.首先从我们的调用开始emitter.onNext(“hello”);,这里的emitter其实就是上面我们创建的CreateEmitter对象,调用其onNext()方法,代码如下:

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}


调用了observer的onNext方法,从我们的流程图中我们可以看到,observer其实是一个SubscribeOnObserver,也就是调用SubscribeOnObserver的onNext方法,代码如下:

@Override
public void onNext(T t) {
actual.onNext(t);
}


调用actual的onNext方法,从流程图中我们可以看到,这个acutal是一个ObserveObObserver对象,接着来看ObserveObObserver的onNext方法:

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}


关键的来了,调用了schedule()方法,从名字看是要切换线程了,

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
....
}
a.onNext(v);
}
......
}
}


schedule()方法调用worker.schedule(this);此方法其实就是在将线程切换到主线程(也就是我们ObserveOn(AndroidSchedulers.mainThread())所指定的线程),执行drainNormal方法,

drainNormal()方法调用a.onNext(v),a 在

final Observer<? super T> a = actual;


中被赋值为actual,这个actual是不是很熟悉,没错在流程图中我们可以看到actual是一个LambdaObserver



结束了,还记得么LambdaObserver的onNext方法是我们定义的,终于完成了这么一条调用链,看着是挺复杂的,但结合着图理解,就很简单了:)

总结RxJava2的切换原理是 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程.

接下来我们来结合着流程图来看看可能存在内存泄露的地方,我们都知道Java中非静态内部类持有外部类的引用,这是导致内存泄露的常见形式:内部类因为某些原因不能被释放,导致它所持有的外部类也得不到释放.来看看我们demo中定义的RxJava代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.i(TAG,s); } });


我们定义了两个内部类:ObservableOnSubscribe和Consumer,我们想象一种情况,如果io线程里执行的ObservableOnSubscribe的onNext方法阻塞住了一直没有返回,导致此对象不能被回收,如下面代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
Thread.sleep(10000000);
emitter.onNext("hello");
}
})


代码中subscribe方法sleep住了,ObservableOnSubscribe对象不能被回收,导致外层的Activity也不能被垃圾回收器回收,因为我们的Activity中持有了一个Bitmap对象,当我们退出此Activity,从内存监测中也能看到内存占用并没有被释放.如图:



查看前一定要点一下左上角的卡车图标,强制GC一下…

你也许看到问题了,我们没有在onDestroy方法里调用 comDisposable.dispose();方法,好我们现在调用此方法,然后查看结果.OK完美内存占用消失了,如图:



控制台打印了 I/MainActivity: thread interrupted,任务取消的原因是调用的线程的interrupt()方法来实现的,等等这里会有一个问题,如果我们捕获了InterruptedException异常但是并不退出这个任务,或者说Interrupt()方法并不能打断这个任务呢,如下面代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {

int num = 1000;
while(num > 0) {
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
Log.i(TAG, "thread interrupted");
}
}
emitter.onNext("hello");
}
})


这样呢…看你还怎么打断-_-



从截图中可以看到内存没有释放,也就是Activity不能被回收,这就是第一个产生内存泄露的地方内部类ObservableOnSubscribe无法立刻垃圾回收导致外层Activity不能被释放,

有经验的java程序员一定会跳出来说要把ObservableOnSubscribe声明为静态的,好吧,让我们试试

static ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {

int num = 1000;
while(num > 0) {
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
Log.i(TAG, "thread interrupted");
}
}
emitter.onNext("hello");
}
};

CompositeDisposable comDisposable = new CompositeDisposable();
Bitmap bitmap;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<String> observable = Observable.create(observableOnSubscribe)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.i(TAG,s); } });
comDisposable.add(disposable);
bitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.aaa);
}


退出应用,强制gc,再来看一下内存占用,内存并没有被释放还是100多M,这是怎么回事呢?让我们在来看一下流程图,ObservableOnSubscribe对象的subscribe方法传入了一个CreateEmitter对象,subscribe方法如果不能执行完的话,CreateEmitter对象就不能被释放,而CreateEmitter对象中持有一个observer,这个observer是SubscribeOnObserver对象,

而SubscribeOnObserver对象也持有一个actual对象,此对象为ObserveOnOnserver对象…,到最后这条链会到LambdaObserver对象,此对象持有我们自定义的onNext对象,而此对象是我们定义的一个内部类:

new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG,s);
}
}


这个类就持有这外部的Activity,这时你会说可以把这个类也声明为静态不就行了,这样是可以,但是因为我们onNext一般是执行主线程操作,是需要外部对象执行操作的,当然你也可以用弱引用来保存外部Activity,但是这里我们用更简单点的方法.解决这个内存泄露的重点是打破这个链条就行了,从图中看就是将actual置为空不就完事了,对就是这么简单,接下来就是怎么置为null的问题.你也许注意到我们在onDestroy方法里不是已经调用了comDisposable.dispose()方法么,为什么这个方法没有切断这条链呢,很遗憾这个方法只会切断上游和下游的数据传递,并不会将actual置为null,不过rxJava提供了一个方法 onTerminateDetach()来切断这条链,我们来分析一下这个方法:

public final Observable<T> onTerminateDetach() {
return RxJavaPlugins.onAssembly(new ObservableDetach<T>(this));
}


流程图更新为:



在ObservableObserveOn下面又插入了一层,这一层没有切换线程什么的操作,但是在DetachObserver对象的dispose()方法中执行了关键的操作

@Override
public void dispose() {
Disposable s = this.s;
this.s = EmptyComponent.INSTANCE;
this.actual = EmptyComponent.asObserver();
s.dispose();
}


可以看到代码中将actual置为了空,切断了与下游的联系,打破了这条链接.我们再来看一下内存占用,可以看到内存已经被释放了,这里我就不截图了.这里我们注意到一个问题,onTerminateDetach()方法只是将自己的actual对象置为空,而其他上游的actual对象没有为空,可能还会出现内存泄露风险,将代码改为下面:

Observable<String> observable = Observable.create(observableOnSubscribe)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.trim();
}
})
.onTerminateDetach();


添加一个map转换,此时我们测试,发现内存是无法释放的也就是存在内存泄露问题

map的源码定义:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}


我们可以修改流程图



从图中可以看到ObserveOnObserver中的actual是一个MapObserver,而MapObserver里有我们自定义的一个Function对象,此对象持有外部Activity的引用,要解决这个问题就是要切断MapServer与上游的引用关系,

这是就需要再次调用onTerminateDetach()方法了

Observable<String> observable = Observable.create(observableOnSubscribe)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onTerminateDetach()
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.trim();
}
})
.onTerminateDetach();


再次测试,ok了,内存被释放了,调用两次onTerminateDetach()方法终于解决了泄露问题

总结: 解决内存问题主要有几个步骤:

1.事件生成对象(ObservableOnSubscribe对象)要设为静态内部类,不要持有外部Activity引用

2.在适当位置调用 onTerminateDetach()方法

3.Activity的onDestroy中要调用dispose()方法

源码地址 https://github.com/keji/csdn-demo/blob/master/RxJavaDemo.zip

这里说明一下三星手机可有有些问题,从leakcanary的提交中https://github.com/square/leakcanary/pull/608 能看到三星在ActivityManager中有一个静态变量持有了最后一个Activity(这真是太奇葩了),解决方法是将例子中的MainActivity不作为启动Activity,用另外的一个Activity作为启动Activity,然后通过按钮跳转到此Activity就可以避开三星的问题
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐