Android消息處理機(jī)制(Handler、Looper、MessageQueue與Message)
Android是消息驅(qū)動的,實(shí)現(xiàn)消息驅(qū)動有幾個(gè)要素:
- 消息的表示:Message
- 消息隊(duì)列:MessageQueue
- 消息循環(huán),用于循環(huán)取出消息進(jìn)行處理:Looper
- 消息處理,消息循環(huán)從消息隊(duì)列中取出消息后要對消息進(jìn)行處理:Handler
平時(shí)我們最常使用的就是Message與Handler了,如果使用過HandlerThread或者自己實(shí)現(xiàn)類似HandlerThread的東西可能還會接觸到Looper,而MessageQueue是Looper內(nèi)部使用的,對于標(biāo)準(zhǔn)的SDK,我們是無法實(shí)例化并使用的(構(gòu)造函數(shù)是包可見性)。
我們平時(shí)接觸到的Looper、Message、Handler都是用JAVA實(shí)現(xiàn)的,Android做為基于Linux的系統(tǒng),底層用C、C++實(shí)現(xiàn)的,而且還有NDK的存在,消息驅(qū)動的模型怎么可能只存在于JAVA層,實(shí)際上,在Native層存在與Java層對應(yīng)的類如Looper、MessageQueue等。
初始化消息隊(duì)列
首先來看一下如果一個(gè)線程想實(shí)現(xiàn)消息循環(huán)應(yīng)該怎么做,以HandlerThread為例:
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
主要是紅色標(biāo)明的兩句,首先調(diào)用prepare初始化MessageQueue與Looper,然后調(diào)用loop進(jìn)入消息循環(huán)。先看一下Looper.prepare。
public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
重載函數(shù),quitAllowed默認(rèn)為true,從名字可以看出來就是消息循環(huán)是否可以退出,默認(rèn)是可退出的,Main線程(UI線程)初始化消息循環(huán)時(shí)會調(diào)用prepareMainLooper,傳進(jìn)去的是false。使用了ThreadLocal,每個(gè)線程可以初始化一個(gè)Looper。
再來看一下Looper在初始化時(shí)都做了什么:
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mRun = true; mThread = Thread.currentThread(); } MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; nativeInit(); }
在Looper初始化時(shí),新建了一個(gè)MessageQueue的對象保存了在成員mQueue中。MessageQueue的構(gòu)造函數(shù)是包可見性,所以我們是無法直接使用的,在MessageQueue初始化的時(shí)候調(diào)用了nativeInit,這是一個(gè)Native方法:
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return; } nativeMessageQueue->incStrong(env); android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue); } static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj, NativeMessageQueue* nativeMessageQueue) { env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr, reinterpret_cast<jint>(nativeMessageQueue)); }
在nativeInit中,new了一個(gè)Native層的MessageQueue的對象,并將其地址保存在了Java層MessageQueue的成員mPtr中,Android中有好多這樣的實(shí)現(xiàn),一個(gè)類在Java層與Native層都有實(shí)現(xiàn),通過JNI的GetFieldID與SetIntField把Native層的類的實(shí)例地址保存到Java層類的實(shí)例的mPtr成員中,比如Parcel。
再看NativeMessageQueue的實(shí)現(xiàn):
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
在NativeMessageQueue的構(gòu)造函數(shù)中獲得了一個(gè)Native層的Looper對象,Native層的Looper也使用了線程本地存儲,注意new Looper時(shí)傳入了參數(shù)false。
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", errno); result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); }
Native層的Looper使用了epoll。初始化了一個(gè)管道,用mWakeWritePipeFd與mWakeReadPipeFd分別保存了管道的寫端與讀端,并監(jiān)聽了讀端的EPOLLIN事件。注意下初始化列表的值,mAllowNonCallbacks的值為false。
mAllowNonCallback是做什么的?使用epoll僅為了監(jiān)聽mWakeReadPipeFd的事件?其實(shí)Native Looper不僅可以監(jiān)聽這一個(gè)描述符,Looper還提供了addFd方法:
int addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data); int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
fd表示要監(jiān)聽的描述符。ident表示要監(jiān)聽的事件的標(biāo)識,值必須>=0或者為ALOOPER_POLL_CALLBACK(-2),event表示要監(jiān)聽的事件,callback是事件發(fā)生時(shí)的回調(diào)函數(shù),mAllowNonCallbacks的作用就在于此,當(dāng)mAllowNonCallbacks為true時(shí)允許callback為NULL,在pollOnce中ident作為結(jié)果返回,否則不允許callback為空,當(dāng)callback不為NULL時(shí),ident的值會被忽略。還是直接看代碼方便理解:
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data); #endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) { ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { ident = ALOOPER_POLL_CALLBACK; } int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.callback = callback; request.data = data; struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = epollEvents; eventItem.data.fd = fd; ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.add(fd, request); } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
如果callback為空會檢查mAllowNonCallbacks看是否允許callback為空,如果允許callback為空還會檢測ident是否>=0。如果callback不為空會把ident的值賦值為ALOOPER_POLL_CALLBACK,不管傳進(jìn)來的是什么值。
接下來把傳進(jìn)來的參數(shù)值封裝到一個(gè)Request結(jié)構(gòu)體中,并以描述符為鍵保存到一個(gè)KeyedVector mRequests中,然后通過epoll_ctl添加或替換(如果這個(gè)描述符之前有調(diào)用addFD添加監(jiān)聽)對這個(gè)描述符事件的監(jiān)聽。
類圖:

發(fā)送消息
通過Looper.prepare初始化好消息隊(duì)列后就可以調(diào)用Looper.loop進(jìn)入消息循環(huán)了,然后我們就可以向消息隊(duì)列發(fā)送消息,消息循環(huán)就會取出消息進(jìn)行處理,在看消息處理之前,先看一下消息是怎么被添加到消息隊(duì)列的。
在Java層,Message類表示一個(gè)消息對象,要發(fā)送消息首先就要先獲得一個(gè)消息對象,Message類的構(gòu)造函數(shù)是public的,但是不建議直接new Message,Message內(nèi)部保存了一個(gè)緩存的消息池,我們可以用obtain從緩存池獲得一個(gè)消息,Message使用完后系統(tǒng)會調(diào)用recycle回收,如果自己new很多Message,每次使用完后系統(tǒng)放入緩存池,會占用很多內(nèi)存的,如下所示:
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; sPoolSize--; return m; } } return new Message(); } public void recycle() { clearForRecycle(); synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
Message內(nèi)部通過next成員實(shí)現(xiàn)了一個(gè)鏈表,這樣sPool就了為了一個(gè)Messages的緩存鏈表。
消息對象獲取到了怎么發(fā)送呢,大家都知道是通過Handler的post、sendMessage等方法,其實(shí)這些方法最終都是調(diào)用的同一個(gè)方法sendMessageAtTime:
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); }
sendMessageAtTime獲取到消息隊(duì)列然后調(diào)用enqueueMessage方法,消息隊(duì)列mQueue是從與Handler關(guān)聯(lián)的Looper獲得的。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
enqueueMessage將message的target設(shè)置為當(dāng)前的handler,然后調(diào)用MessageQueue的enqueueMessage,在調(diào)用queue.enqueueMessage之前判斷了mAsynchronous,從名字看是異步消息的意思,要明白Asynchronous的作用,需要先了解一個(gè)概念Barrier。
Barrier與Asynchronous Message
Barrier是什么意思呢,從名字看是一個(gè)攔截器,在這個(gè)攔截器后面的消息都暫時(shí)無法執(zhí)行,直到這個(gè)攔截器被移除了,MessageQueue有一個(gè)函數(shù)叫enqueueSyncBarier可以添加一個(gè)Barrier。
int enqueueSyncBarrier(long when) { // Enqueue a new sync barrier token. // We don't need to wake the queue because the purpose of a barrier is to stall it. synchronized (this) { final int token = mNextBarrierToken++; final Message msg = Message.obtain(); msg.arg1 = token; Message prev = null; Message p = mMessages; if (when != 0) { while (p != null && p.when <= when) { prev = p; p = p.next; } } if (prev != null) { // invariant: p == prev.next msg.next = p; prev.next = msg; } else { msg.next = p; mMessages = msg; } return token; } }
在enqueueSyncBarrier中,obtain了一個(gè)Message,并設(shè)置msg.arg1=token,token僅是一個(gè)每次調(diào)用enqueueSyncBarrier時(shí)自增的int值,目的是每次調(diào)用enqueueSyncBarrier時(shí)返回唯一的一個(gè)token,這個(gè)Message同樣需要設(shè)置執(zhí)行時(shí)間,然后插入到消息隊(duì)列,特殊的是這個(gè)Message沒有設(shè)置target,即msg.target為null。
進(jìn)入消息循環(huán)后會不停地從MessageQueue中取消息執(zhí)行,調(diào)用的是MessageQueue的next函數(shù),其中有這么一段:
Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); }
如果隊(duì)列頭部的消息的target為null就表示它是個(gè)Barrier,因?yàn)橹挥袃煞N方法往mMessages中添加消息,一種是enqueueMessage,另一種是enqueueBarrier,而enqueueMessage中如果mst.target為null是直接拋異常的,后面會看到。
所謂的異步消息其實(shí)就是這樣的,我們可以通過enqueueBarrier往消息隊(duì)列中插入一個(gè)Barrier,那么隊(duì)列中執(zhí)行時(shí)間在這個(gè)Barrier以后的同步消息都會被這個(gè)Barrier攔截住無法執(zhí)行,直到我們調(diào)用removeBarrier移除了這個(gè)Barrier,而異步消息則沒有影響,消息默認(rèn)就是同步消息,除非我們調(diào)用了Message的setAsynchronous,這個(gè)方法是隱藏的。只有在初始化Handler時(shí)通過參數(shù)指定往這個(gè)Handler發(fā)送的消息都是異步的,這樣在Handler的enqueueMessage中就會調(diào)用Message的setAsynchronous設(shè)置消息是異步的,從上面Handler.enqueueMessage的代碼中可以看到。
所謂異步消息,其實(shí)只有一個(gè)作用,就是在設(shè)置Barrier時(shí)仍可以不受Barrier的影響被正常處理,如果沒有設(shè)置Barrier,異步消息就與同步消息沒有區(qū)別,可以通過removeSyncBarrier移除Barrier:
void removeSyncBarrier(int token) { // Remove a sync barrier token from the queue. // If the queue is no longer stalled by a barrier then wake it. final boolean needWake; synchronized (this) { Message prev = null; Message p = mMessages; while (p != null && (p.target != null || p.arg1 != token)) { prev = p; p = p.next; } if (p == null) { throw new IllegalStateException("The specified message queue synchronization " + " barrier token has not been posted or has already been removed."); } if (prev != null) { prev.next = p.next; needWake = false; } else { mMessages = p.next; needWake = mMessages == null || mMessages.target != null; } p.recycle(); } if (needWake) { nativeWake(mPtr); } }
參數(shù)token就是enqueueSyncBarrier的返回值,如果沒有調(diào)用指定的token不存在是會拋異常的。
enqueueMessage
接下來看一下是怎么MessageQueue的enqueueMessage。
final boolean enqueueMessage(Message msg, long when) { if (msg.isInUse()) { throw new AndroidRuntimeException(msg + " This message is already in use."); } if (msg.target == null) { throw new AndroidRuntimeException("Message must have a target."); } boolean needWake; synchronized (this) { if (mQuiting) { RuntimeException e = new RuntimeException( msg.target + " sending message to a Handler on a dead thread"); Log.w("MessageQueue", e.getMessage(), e); return false; } msg.when = when; Message p = mMessages; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } } if (needWake) { nativeWake(mPtr); } return true; }
注意上面代碼紅色的部分,當(dāng)msg.target為null時(shí)是直接拋異常的。
在enqueueMessage中首先判斷,如果當(dāng)前的消息隊(duì)列為空,或者新添加的消息的執(zhí)行時(shí)間when是0,或者新添加的消息的執(zhí)行時(shí)間比消息隊(duì)列頭的消息的執(zhí)行時(shí)間還早,就把消息添加到消息隊(duì)列頭(消息隊(duì)列按時(shí)間排序),否則就要找到合適的位置將當(dāng)前消息添加到消息隊(duì)列。
Native發(fā)送消息
消息模型不只是Java層用的,Native層也可以用,前面也看到了消息隊(duì)列初始化時(shí)也同時(shí)初始化了Native層的Looper與NativeMessageQueue,所以Native層應(yīng)該也是可以發(fā)送消息的。與Java層不同的是,Native層是通過Looper發(fā)消息的,同樣所有的發(fā)送方法最終是調(diào)用sendMessageAtTime:
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); } }
Native Message只有一個(gè)int型的what字段用來區(qū)分不同的消息,sendMessageAtTime指定了Message,Message要執(zhí)行的時(shí)間when,與處理這個(gè)消息的Handler:MessageHandler,然后用MessageEnvelope封裝了time, MessageHandler與Message,Native層發(fā)的消息都保存到了mMessageEnvelopes中,mMessageEnvelopes是一個(gè)Vector<MessageEnvelope>。Native層消息同樣是按時(shí)間排序,與Java層的消息分別保存在兩個(gè)隊(duì)列里。
消息循環(huán)
消息隊(duì)列初始化好了,也知道怎么發(fā)消息了,下面就是怎么處理消息了,看Handler.loop函數(shù):
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycle(); } }
loop每次從MessageQueue取出一個(gè)Message,調(diào)用msg.target.dispatchMessage(msg),target就是發(fā)送message時(shí)跟message關(guān)聯(lián)的handler,這樣就調(diào)用到了熟悉的dispatchMessage,Message被處理后會被recycle。當(dāng)queue.next返回null時(shí)會退出消息循環(huán),接下來就看一下MessageQueue.next是怎么取出消息的,又會在什么時(shí)候返回null。
final Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) { if (mQuiting) { return null; } // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
MessageQueue.next首先會調(diào)用nativePollOnce,然后如果mQuiting為true就返回null,Looper就會退出消息循環(huán)。
接下來取消息隊(duì)列頭部的消息,如果頭部消息是Barrier(target==null)就往后遍歷找到第一個(gè)異步消息,接下來檢測獲取到的消息(消息隊(duì)列頭部的消息或者第一個(gè)異步消息),如果為null表示沒有消息要執(zhí)行,設(shè)置nextPollTimeoutMillis = -1;否則檢測這個(gè)消息要執(zhí)行的時(shí)間,如果到執(zhí)行時(shí)間了就將這個(gè)消息markInUse并從消息隊(duì)列移除,然后從next返回到loop;否則設(shè)置nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE),即距離最近要執(zhí)行的消息還需要多久,無論是當(dāng)前消息隊(duì)列沒有消息可以執(zhí)行(設(shè)置了Barrier并且沒有異步消息或消息隊(duì)列為空)還是隊(duì)列頭部的消息未到執(zhí)行時(shí)間,都會執(zhí)行后面的代碼,看有沒有設(shè)置IdleHandler,如果有就運(yùn)行IdleHandler,當(dāng)IdleHandler被執(zhí)行之后會設(shè)置nextPollTimeoutMillis = 0。
首先看一下nativePollOnce,native方法,調(diào)用JNI,最后調(diào)到了Native Looper::pollOnce,并從Java層傳進(jìn)去了nextPollTimeMillis,即Java層的消息隊(duì)列中執(zhí)行時(shí)間最近的消息還要多久到執(zhí)行時(shí)間。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
先不看開始的一大串代碼,先看一下pollInner:
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // Acquire lock. mLock.lock(); // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER_POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER_POLL_TIMEOUT; goto Done; } // Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = ALOOPER_POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == ALOOPER_POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = ALOOPER_POLL_CALLBACK; } } return result; }
Java層的消息都保存在了Java層MessageQueue的成員mMessages中,Native層的消息都保存在了Native Looper的mMessageEnvelopes中,這就可以說有兩個(gè)消息隊(duì)列,而且都是按時(shí)間排列的。timeOutMillis表示Java層下個(gè)要執(zhí)行的消息還要多久執(zhí)行,mNextMessageUpdate表示Native層下個(gè)要執(zhí)行的消息還要多久執(zhí)行,如果timeOutMillis為0,epoll_wait不設(shè)置TimeOut直接返回;如果為-1說明Java層無消息直接用Native的time out;否則pollInner取這兩個(gè)中的最小值作為timeOut調(diào)用epoll_wait。當(dāng)epoll_wait返回時(shí)就可能有以下幾種情況:
-
出錯(cuò)返回。
-
Time Out
-
正常返回,描述符上有事件產(chǎn)生。
如果是前兩種情況直接goto DONE。
否則就說明FD上有事件發(fā)生了,如果是mWakeReadPipeFd的EPOLLIN事件就調(diào)用awoken,如果不是mWakeReadPipeFd,那就是通過addFD添加的fd,在addFD中將要監(jiān)聽的fd及其events,callback,data封裝成了Request對象,并以fd為鍵保存到了KeyedVector mRequests中,所以在這里就以fd為鍵獲得在addFD時(shí)關(guān)聯(lián)的Request,并連同events通過pushResonse加入mResonse隊(duì)列(Vector),Resonse僅是對events與Request的封裝。如果是epoll_wait出錯(cuò)或timeout,就沒有描述符上有事件,就不用執(zhí)行這一段代碼,所以直接goto DONE了。
void Looper::pushResponse(int events, const Request& request) { Response response; response.events = events; response.request = request; mResponses.push(response); }
接下來進(jìn)入DONE部分,從mMessageEnvelopes取出頭部的Native消息,如果到達(dá)了執(zhí)行時(shí)間就調(diào)用它內(nèi)部保存的MessageeHandler的handleMessage處理并從Native 消息隊(duì)列移除,設(shè)置result為ALOOPER_POLL_CALLBACK,否則計(jì)算mNextMessageUptime表示Native消息隊(duì)列下一次消息要執(zhí)行的時(shí)間。如果未到頭部消息的執(zhí)行時(shí)間有可能是Java層消息隊(duì)列消息的執(zhí)行時(shí)間小于Native層消息隊(duì)列頭部消息的執(zhí)行時(shí)間,到達(dá)了Java層消息的執(zhí)行時(shí)間epoll_wait TimeOut返回了,或都通過addFd添加的描述符上有事件發(fā)生導(dǎo)致epoll_wait返回,或者epoll_wait是出錯(cuò)返回。Native消息是沒有Barrier與Asynchronous的。
最后,遍歷mResponses(前面剛通過pushResponse存進(jìn)去的),如果response.request.ident == ALOOPER_POLL_CALLBACK,就調(diào)用注冊的callback的handleEvent(fd, events, data)進(jìn)行處理,然后從mResonses隊(duì)列中移除,這次遍歷完之后,mResponses中保留來來的就都是ident>=0并且callback為NULL的了。在NativeMessageQueue初始化Looper時(shí)傳入了mAllowNonCallbacks為false,所以這次處理完后mResponses一定為空。
接下來返回到pollOnce。pollOnce是一個(gè)for循環(huán),pollInner中處理了所有response.request.ident==ALOOPER_POLL_CALLBACK的Response,在第二次進(jìn)入for循環(huán)后如果mResponses不為空就可以找到ident>0的Response,將其ident作為返回值返回由調(diào)用pollOnce的函數(shù)自己處理,在這里我們是在NativeMessageQueue中調(diào)用的Loope的pollOnce,沒對返回值進(jìn)行處理,而且mAllowNonCallbacks為false也就不可能進(jìn)入這個(gè)循環(huán)。pollInner返回值不可能是0,或者說只可能是負(fù)數(shù),所以pollOnce中的for循環(huán)只會執(zhí)行兩次,在第二次就返回了。
Native Looper可以單獨(dú)使用,也有一個(gè)prepare函數(shù),這時(shí)mAllowNonCallbakcs值可能為true,pollOnce中對mResponses的處理就有意義了。
wake與awoken
在Native Looper的構(gòu)造函數(shù)中,通過pipe打開了一個(gè)管道,并用mWakeReadPipeFd與mWakeWritePipeFd分別保存了管道的讀端與寫端,然后用epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd,& eventItem)監(jiān)聽了讀端的EPOLLIN事件,在pollInner中通過epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)讀取事件,那是在什么時(shí)候往mWakeWritePipeFd寫,又是在什么時(shí)候讀的mWakeReadPipeFd呢?
在Looper.cpp中我們可以發(fā)現(xiàn)如下兩個(gè)函數(shù):
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1); } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } } void Looper::awoken() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); }
wake函數(shù)向mWakeWritePipeFd寫入了一個(gè)“W”字符,awoken從mWakeReadPipeFd讀,往mWakeWritePipeFd寫數(shù)據(jù)只是為了在pollInner中的epoll_wait可以監(jiān)聽到事件返回。在pollInner也可以看到如果是mWakeReadPipeFd的EPOLLIN事件只是調(diào)用了awoken消耗掉了寫入的字符就往后處理了。
那什么時(shí)候調(diào)用wake呢?這個(gè)只要找到調(diào)用的地方分析一下就行了,先看Looper.cpp,在sendMessageAtTime即發(fā)送Native Message的時(shí)候,根據(jù)發(fā)送的Message的執(zhí)行時(shí)間查找mMessageEnvelopes計(jì)算應(yīng)該插入的位置,如果是在頭部插入,就調(diào)用wake喚醒epoll_wait,因?yàn)樵谶M(jìn)入pollInner時(shí)根據(jù)Java層消息隊(duì)列頭部消息的執(zhí)行時(shí)間與Native層消息隊(duì)列頭部消息的執(zhí)行時(shí)間計(jì)算出了一個(gè)timeout,如果這個(gè)新消息是在頭部插入,說明執(zhí)行時(shí)間至少在上述兩個(gè)消息中的一個(gè)之前,所以應(yīng)該喚醒epoll_wait,epoll_wait返回后,檢查Native消息隊(duì)列,看頭部消息即剛插入的消息是否到執(zhí)行時(shí)間了,到了就執(zhí)行,否則就可能需要設(shè)置新的timeout。同樣在Java層的MessageQueue中,有一個(gè)函數(shù)nativeWake也同樣可以通過JNI調(diào)用wake,調(diào)用nativeWake的時(shí)機(jī)與在Native調(diào)用wake的時(shí)機(jī)類似,在消息隊(duì)列頭部插入消息,還有一種情況就是,消息隊(duì)列頭部是一個(gè)Barrier,而且插入的消息是第一個(gè)異步消息。
if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous();//如果頭部是Barrier并且新消息是異步消息則“有可能”需要喚醒 Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { // 消息隊(duì)列中有異步消息并且執(zhí)行時(shí)間在新消息之前,所以不需要喚醒。 needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; }
在頭部插入消息不一定調(diào)用nativeWake,因?yàn)橹翱赡苷趫?zhí)行IdleHandler,如果執(zhí)行了IdleHandler,就在IdleHandler執(zhí)行后把nextPollTimeoutMillis設(shè)置為0,下次進(jìn)入for循環(huán)就用0調(diào)用nativePollOnce,不需要wake,只有在沒有消息可以執(zhí)行(消息隊(duì)列為空或沒到執(zhí)行時(shí)間)并且沒有設(shè)置IdleHandler時(shí)mBlocked才會為true。
如果Java層的消息隊(duì)列被Barrier Block住了并且當(dāng)前插入的是一個(gè)異步消息有可能需要喚醒Looper,因?yàn)楫惒较⒖梢栽贐arrier下執(zhí)行,但是這個(gè)異步消息一定要是執(zhí)行時(shí)間最早的異步消息。
退出Looper也需要wake,removeSyncBarrier時(shí)也可能需要。

浙公網(wǎng)安備 33010602011771號