您的位置:首页 > 产品设计 > UI/UE

android MessageQueue 源码解析

2017-02-07 15:52 190 查看
MessageQueue众所周知是一个Message存储队列,官网对其解释是:

Low-level class holding the list of messages to be dispatched by a Looper . Messages are not added directly to a MessageQueue,but rather through Handler objects associated with the Looper.

You can retrieve the MessageQueue for the current thread with Looper.myQueue()

翻译:保存消息列表的低级别类,消息由Looper对象派发。消息并不是直接添加到MessageQueue中的,而是通过与Looper对象关联的MessageQueue.IdleHandler对象添加。

调用Looper.myQueue方法可以获取当前线程的MessageQueue。

在Looper.prepare()方法中调用了 MessageQueue构造函数创建了一个MessageQueue对象,接下来首先看一下MessageQueue构造函数具体是什么;

MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}


1:源码中可以看到MessageQueue的修饰符是Default的,故外部使用是不能直接new一个MessageQueue对象,只能通过Looper.myQueue()方法来获取当前线程的MessageQueue对象。

2:参数 boolean quitAllowed 表示 True if the message queue can be quit. MessageQueue的构造是通过Looper.prepare()方法进行构建的,而在该方法中传入的quitAllowed = true,故在常用的MessageQueu情况下是允许Looper调用quit来终止该消息循环。

3:mPtr // used by native code 该变量是通过jni来实现获取到的,具体是如何获取,暂不清楚

在消息发送过程中,通过Handler源码了解最终是执行到了MessageQueue.enqueueMessage(msg,uptimeMillis)方法,现在先来看一下MessageQueue中该方法的具体实现。

boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}

msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue.  Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}


在该方法内,首先存在两个判断:

1:msg.target 是否为null ,如果为null 则直接抛出IllegalArgumentException(“Message must have a target.”) 异常,

2:msg是否已经被使用过,如果被使用过,则直接抛出IllegalStateException(msg + ” This message is already in use.”)异常

另外在该方法中可以看到内部是加了关键字 synchronized 这也就保证了 Message 在存放到MessageQueue时,是不受多线程影响的。

之后:

if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}


可以看到当mQuitting = = true时,内部会对msg 进行回收。

msg.recycle();


而mQuitting 的设置 可以看到是在

void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}

synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;

if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}

// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}


quit() 方法内进行设置,而该方法的调用主要是通过Looper.quit()进行调用。由此可见Looper的使用 在每次程序退出时需要主动去调用 Looper.quit()或者 Looper.quitSafely()对msg进行回收。

接下来看MessageQueue中是如何保存Message的

msg.markInUse();
msg.when = when;


对Message进行标记,同时记录当前Message的延迟时间。

接下来可以看到

if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
}
else {
// Inserted within the middle of the queue.  Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}


当满足条件 p== null 或者 when == 0 或者 when < p.when 那么将会执行该段代码

msg.next = p;
mMessages = msg;
needWake = mBlocked;


通过该段代码可以看出 MessageQueue实质上是通过链式结构将Message保存起来。

当p != null && when != 0 && when >= p.when 时 则执行下面该段代码

needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;


在上述该段代码中可以看到在该else 判断中存在一个 for(;;) 循环,而该循环的目的是确定最新Message需要插入的位置,而该位置是根据每一个Message.when来确定的。

最终的排列顺序是:Message.when 值越小排列位置越在前。

由此可以想到 MessageQueue内数据的排列是按照链式结构排序,每一个元素的位置是由该元素的when大小来决定的。

到此MessageQueue是如何存储Message的已经解析完成,接下来看一下 MessageQueue是如何来获取链表中的数据的。

MessageQueue获取Message是通过Looper.loop()方法来获取的,在loop方法中通过MessageQueue.next()方法来获取MessageQueue内的Message数据

Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// Try to retrieve the next message.  Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier.  Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready.  Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}

// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}

// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run.  Loop and wait some more.
mBlocked = true;
continue;
}

if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}


先来看以上几行代码:

final long ptr = mPtr;
if (ptr == 0) {
return null;
}


mPtr该值是在jni中进行改变的,具体是如何改变,暂不清楚。

根据上述代码可以看出 如果mPtr==0 那么将直接return null;

紧接着看以下代码每次执行next()方法 nextPollTimeoutMillis 重新赋值为0,那么

for(;;)循环中

if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}


该段代码是不执行的。

紧接着 执行 nativePollOnce(ptr, nextPollTimeoutMillis),而该方法的具体实现也是jni中实现的,暂不清楚。

后续代码中看到 for(;;)循环内是有 synchronized 关键字的,这样就避免了多线程问题。

if (msg != null && msg.target == null) {
// Stalled by a barrier.  Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}


首先看该段代码,do-while循环中,当msg != null && msg.target == null

获取当亲msg,并将链表中的指针指向下一个。

紧接着看以下代码:

if (msg != null) {
if (now < msg.when) {
// Next message is not ready.  Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}


当msg != null 时:

内部存在两个判断,分别是 :

now

nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);


可以看出是给 nextPollTimeoutMillis 进行重新复制

now > = msg.when: 这是可以考虑到是获取msg 并将给msg return出去

具体的实现可以看以下该段代码:

mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;


可以看到,当第一次执行是 prevMsg == null 则将msg.next 复制给mMessages对象,同时当前msg.next 置null 并标记为InUse

当msg == null时;重置 nextPollTimeoutMillis = -1;

接下来看一下后续的一段代码:

if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run.  Loop and wait some more.
mBlocked = true;
continue;
}

if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;


可以看到如果Looper设置了quitSafely(),那么将会执行 dispose() 方法

该方法一会在说。

而对于后面的一段代码本人技术有限确实没怎么看懂,如果有明白的大牛还望赐教:

if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run.  Loop and wait some more.
mBlocked = true;
continue;
}

if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;


下面来看一下 dispose()方法;

private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}


可以看到内部其实直接通过jni操作了底层,而这块底层代码不得而知了。

另外在Looper中常调用 Looper.quit()、或者 Looper. quitSafely()

而这两个方法同时调用到MessageQueue中的quit(boolean safe)方法

void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}

synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;

if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}

// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}


在该方法内可以看到主要是 调用了 removeAllFutureMessagesLocked()、

removeAllMessagesLocked()、nativeWake(mPtr); 这三个方法,而最后一个是jni调用,内部具体是什么不清楚,而其它两个,通过源码可以看到:

private void removeAllFutureMessagesLocked() {
final long now = SystemClock.uptimeMillis();
Message p = mMessages;
if (p != null) {
if (p.when > now) {
removeAllMessagesLocked();
} else {
Message n;
for (;;) {
n = p.next;
if (n == null) {
return;
}
if (n.when > now) {
break;
}
p = n;
}
p.next = null;
do {
p = n;
n = p.next;
p.recycleUnchecked();
} while (n != null);
}
}
}


该方法 removeAllFutureMessagesLocked,可以看出,该方法只会清空MessageQueue消息池中所有的延迟消息,并将消息池中所有的非延迟消息派发出去让Handler去处理,quitSafely相比于quit方法安全之处在于清空消息之前会派发所有的非延迟消息。

private void removeAllMessagesLocked() {
Message p = mMessages;
while (p != null) {
Message n = p.next;
p.recycleUnchecked();
p = n;
}
mMessages = null;
}


通过源码可以了解到该方法removeAllMessagesLocked的作用是把MessageQueue消息池中所有的消息全部清空,无论是延迟消息(延迟消息是指通过sendMessageDelayed或通过postDelayed等方法发送的需要延迟执行的消息)还是非延迟消息。

至此MessageQueue中部分方法已经解析完成,读者如果想更多的了解,可以自行参考源码,也可以留言进行讨论。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐