<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Android消息處理機(jī)制(Handler、Looper、MessageQueue與Message)

      Android是消息驅(qū)動的,實(shí)現(xiàn)消息驅(qū)動有幾個(gè)要素:

      1. 消息的表示:Message
      2. 消息隊(duì)列:MessageQueue
      3. 消息循環(huán),用于循環(huán)取出消息進(jìn)行處理:Looper
      4. 消息處理,消息循環(huán)從消息隊(duì)列中取出消息后要對消息進(jìn)行處理:Handler

      平時(shí)我們最常使用的就是Message與Handler了,如果使用過HandlerThread或者自己實(shí)現(xiàn)類似HandlerThread的東西可能還會接觸到Looper,而MessageQueue是Looper內(nèi)部使用的,對于標(biāo)準(zhǔn)的SDK,我們是無法實(shí)例化并使用的(構(gòu)造函數(shù)是包可見性)。

      我們平時(shí)接觸到的Looper、Message、Handler都是用JAVA實(shí)現(xiàn)的,Android做為基于Linux的系統(tǒng),底層用C、C++實(shí)現(xiàn)的,而且還有NDK的存在,消息驅(qū)動的模型怎么可能只存在于JAVA層,實(shí)際上,在Native層存在與Java層對應(yīng)的類如Looper、MessageQueue等。

       初始化消息隊(duì)列

      首先來看一下如果一個(gè)線程想實(shí)現(xiàn)消息循環(huán)應(yīng)該怎么做,以HandlerThread為例:

      public void run() {
          mTid = Process.myTid();
          Looper.prepare();
          synchronized (this) {
              mLooper = Looper.myLooper();
              notifyAll();
          }
          Process.setThreadPriority(mPriority);
          onLooperPrepared();
          Looper.loop();
          mTid = -1;
      } 

      主要是紅色標(biāo)明的兩句,首先調(diào)用prepare初始化MessageQueue與Looper,然后調(diào)用loop進(jìn)入消息循環(huán)。先看一下Looper.prepare。

      public static void prepare() {
          prepare(true);
      }
      
      private static void prepare(boolean quitAllowed) {
          if (sThreadLocal.get() != null) {
              throw new RuntimeException("Only one Looper may be created per thread");
          }
          sThreadLocal.set(new Looper(quitAllowed));
      }

      重載函數(shù),quitAllowed默認(rèn)為true,從名字可以看出來就是消息循環(huán)是否可以退出,默認(rèn)是可退出的,Main線程(UI線程)初始化消息循環(huán)時(shí)會調(diào)用prepareMainLooper,傳進(jìn)去的是false。使用了ThreadLocal,每個(gè)線程可以初始化一個(gè)Looper。

      再來看一下Looper在初始化時(shí)都做了什么:

      private Looper(boolean quitAllowed) {
          mQueue = new MessageQueue(quitAllowed);
          mRun = true;
          mThread = Thread.currentThread();
      }
      
      MessageQueue(boolean quitAllowed) {
          mQuitAllowed = quitAllowed;
          nativeInit();
      } 

      在Looper初始化時(shí),新建了一個(gè)MessageQueue的對象保存了在成員mQueue中。MessageQueue的構(gòu)造函數(shù)是包可見性,所以我們是無法直接使用的,在MessageQueue初始化的時(shí)候調(diào)用了nativeInit,這是一個(gè)Native方法:

      static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
          NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
          if (!nativeMessageQueue) {
              jniThrowRuntimeException(env, "Unable to allocate native queue");
              return;
          }
      
          nativeMessageQueue->incStrong(env);
          android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
      }
      
      static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
              NativeMessageQueue* nativeMessageQueue) {
          env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
                   reinterpret_cast<jint>(nativeMessageQueue));
      }

      在nativeInit中,new了一個(gè)Native層的MessageQueue的對象,并將其地址保存在了Java層MessageQueue的成員mPtr中,Android中有好多這樣的實(shí)現(xiàn),一個(gè)類在Java層與Native層都有實(shí)現(xiàn),通過JNI的GetFieldID與SetIntField把Native層的類的實(shí)例地址保存到Java層類的實(shí)例的mPtr成員中,比如Parcel。

      再看NativeMessageQueue的實(shí)現(xiàn):

      NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
          mLooper = Looper::getForThread();
          if (mLooper == NULL) {
              mLooper = new Looper(false);
              Looper::setForThread(mLooper);
          }
      }

      在NativeMessageQueue的構(gòu)造函數(shù)中獲得了一個(gè)Native層的Looper對象,Native層的Looper也使用了線程本地存儲,注意new Looper時(shí)傳入了參數(shù)false。

      Looper::Looper(bool allowNonCallbacks) :
              mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
              mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
          int wakeFds[2];
          int result = pipe(wakeFds);
          LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
      
          mWakeReadPipeFd = wakeFds[0];
          mWakeWritePipeFd = wakeFds[1];
      
          result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
          LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
                  errno);
      
          result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
          LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
                  errno);
      
          // Allocate the epoll instance and register the wake pipe.
          mEpollFd = epoll_create(EPOLL_SIZE_HINT);
          LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
      
          struct epoll_event eventItem;
          memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
          eventItem.events = EPOLLIN;
          eventItem.data.fd = mWakeReadPipeFd;
          result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
          LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
                  errno);
      }

      Native層的Looper使用了epoll。初始化了一個(gè)管道,用mWakeWritePipeFd與mWakeReadPipeFd分別保存了管道的寫端與讀端,并監(jiān)聽了讀端的EPOLLIN事件。注意下初始化列表的值,mAllowNonCallbacks的值為false。

      mAllowNonCallback是做什么的?使用epoll僅為了監(jiān)聽mWakeReadPipeFd的事件?其實(shí)Native Looper不僅可以監(jiān)聽這一個(gè)描述符,Looper還提供了addFd方法:

      int addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data);
      int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

      fd表示要監(jiān)聽的描述符。ident表示要監(jiān)聽的事件的標(biāo)識,值必須>=0或者為ALOOPER_POLL_CALLBACK(-2),event表示要監(jiān)聽的事件,callback是事件發(fā)生時(shí)的回調(diào)函數(shù),mAllowNonCallbacks的作用就在于此,當(dāng)mAllowNonCallbacks為true時(shí)允許callback為NULL,在pollOnce中ident作為結(jié)果返回,否則不允許callback為空,當(dāng)callback不為NULL時(shí),ident的值會被忽略。還是直接看代碼方便理解:

      int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
      #if DEBUG_CALLBACKS
          ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
                  events, callback.get(), data);
      #endif
          if (!callback.get()) {
              if (! mAllowNonCallbacks) {
                  ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
                  return -1;
              }
              if (ident < 0) {
                  ALOGE("Invalid attempt to set NULL callback with ident < 0.");
                  return -1;
              }
          } else {
              ident = ALOOPER_POLL_CALLBACK;
          }
      
          int epollEvents = 0;
          if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
          if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
      
          { // acquire lock
              AutoMutex _l(mLock);
      
              Request request;
              request.fd = fd;
              request.ident = ident;
              request.callback = callback;
              request.data = data;
      
              struct epoll_event eventItem;
              memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
              eventItem.events = epollEvents;
              eventItem.data.fd = fd;
      
              ssize_t requestIndex = mRequests.indexOfKey(fd);
              if (requestIndex < 0) {
                  int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
                  if (epollResult < 0) {
                      ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
                      return -1;
                  }
                  mRequests.add(fd, request);
              } else {
                  int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
                  if (epollResult < 0) {
                      ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
                      return -1;
                  }
                  mRequests.replaceValueAt(requestIndex, request);
              }
          } // release lock
          return 1;
      }

      如果callback為空會檢查mAllowNonCallbacks看是否允許callback為空,如果允許callback為空還會檢測ident是否>=0。如果callback不為空會把ident的值賦值為ALOOPER_POLL_CALLBACK,不管傳進(jìn)來的是什么值。

      接下來把傳進(jìn)來的參數(shù)值封裝到一個(gè)Request結(jié)構(gòu)體中,并以描述符為鍵保存到一個(gè)KeyedVector mRequests中,然后通過epoll_ctl添加或替換(如果這個(gè)描述符之前有調(diào)用addFD添加監(jiān)聽)對這個(gè)描述符事件的監(jiān)聽。

      類圖:

        

      發(fā)送消息

      通過Looper.prepare初始化好消息隊(duì)列后就可以調(diào)用Looper.loop進(jìn)入消息循環(huán)了,然后我們就可以向消息隊(duì)列發(fā)送消息,消息循環(huán)就會取出消息進(jìn)行處理,在看消息處理之前,先看一下消息是怎么被添加到消息隊(duì)列的。

      在Java層,Message類表示一個(gè)消息對象,要發(fā)送消息首先就要先獲得一個(gè)消息對象,Message類的構(gòu)造函數(shù)是public的,但是不建議直接new Message,Message內(nèi)部保存了一個(gè)緩存的消息池,我們可以用obtain從緩存池獲得一個(gè)消息,Message使用完后系統(tǒng)會調(diào)用recycle回收,如果自己new很多Message,每次使用完后系統(tǒng)放入緩存池,會占用很多內(nèi)存的,如下所示:

          public static Message obtain() {
              synchronized (sPoolSync) {
                  if (sPool != null) {
                      Message m = sPool;
                      sPool = m.next;
                      m.next = null;
                      sPoolSize--;
                      return m;
                  }
              }
              return new Message();
          }
      
          public void recycle() {
              clearForRecycle();
      
              synchronized (sPoolSync) {
                  if (sPoolSize < MAX_POOL_SIZE) {
                      next = sPool;
                      sPool = this;
                      sPoolSize++;
                  }
              }
          }

      Message內(nèi)部通過next成員實(shí)現(xiàn)了一個(gè)鏈表,這樣sPool就了為了一個(gè)Messages的緩存鏈表。

      消息對象獲取到了怎么發(fā)送呢,大家都知道是通過Handler的post、sendMessage等方法,其實(shí)這些方法最終都是調(diào)用的同一個(gè)方法sendMessageAtTime:

          public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
              MessageQueue queue = mQueue;
              if (queue == null) {
                  RuntimeException e = new RuntimeException(
                          this + " sendMessageAtTime() called with no mQueue");
                  Log.w("Looper", e.getMessage(), e);
                  return false;
              }
              return enqueueMessage(queue, msg, uptimeMillis);
          } 

      sendMessageAtTime獲取到消息隊(duì)列然后調(diào)用enqueueMessage方法,消息隊(duì)列mQueue是從與Handler關(guān)聯(lián)的Looper獲得的。

          private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
              msg.target = this;
              if (mAsynchronous) {
                  msg.setAsynchronous(true);
              }
              return queue.enqueueMessage(msg, uptimeMillis);
          }

      enqueueMessage將message的target設(shè)置為當(dāng)前的handler,然后調(diào)用MessageQueue的enqueueMessage,在調(diào)用queue.enqueueMessage之前判斷了mAsynchronous,從名字看是異步消息的意思,要明白Asynchronous的作用,需要先了解一個(gè)概念Barrier。

      Barrier與Asynchronous Message

      Barrier是什么意思呢,從名字看是一個(gè)攔截器,在這個(gè)攔截器后面的消息都暫時(shí)無法執(zhí)行,直到這個(gè)攔截器被移除了,MessageQueue有一個(gè)函數(shù)叫enqueueSyncBarier可以添加一個(gè)Barrier。

          int enqueueSyncBarrier(long when) {
              // Enqueue a new sync barrier token.
              // We don't need to wake the queue because the purpose of a barrier is to stall it.
              synchronized (this) {
                  final int token = mNextBarrierToken++;
                  final Message msg = Message.obtain();
                  msg.arg1 = token;
      
                  Message prev = null;
                  Message p = mMessages;
                  if (when != 0) {
                      while (p != null && p.when <= when) {
                          prev = p;
                          p = p.next;
                      }
                  }
                  if (prev != null) { // invariant: p == prev.next
                      msg.next = p;
                      prev.next = msg;
                  } else {
                      msg.next = p;
                      mMessages = msg;
                  }
                  return token;
              }
          }

      在enqueueSyncBarrier中,obtain了一個(gè)Message,并設(shè)置msg.arg1=token,token僅是一個(gè)每次調(diào)用enqueueSyncBarrier時(shí)自增的int值,目的是每次調(diào)用enqueueSyncBarrier時(shí)返回唯一的一個(gè)token,這個(gè)Message同樣需要設(shè)置執(zhí)行時(shí)間,然后插入到消息隊(duì)列,特殊的是這個(gè)Message沒有設(shè)置target,即msg.target為null。

      進(jìn)入消息循環(huán)后會不停地從MessageQueue中取消息執(zhí)行,調(diào)用的是MessageQueue的next函數(shù),其中有這么一段:

      Message msg = mMessages;
      if (msg != null && msg.target == null) {
          // Stalled by a barrier.  Find the next asynchronous message in the queue.
          do {
              prevMsg = msg;
              msg = msg.next;
          } while (msg != null && !msg.isAsynchronous());
      }

      如果隊(duì)列頭部的消息的target為null就表示它是個(gè)Barrier,因?yàn)橹挥袃煞N方法往mMessages中添加消息,一種是enqueueMessage,另一種是enqueueBarrier,而enqueueMessage中如果mst.target為null是直接拋異常的,后面會看到。

      所謂的異步消息其實(shí)就是這樣的,我們可以通過enqueueBarrier往消息隊(duì)列中插入一個(gè)Barrier,那么隊(duì)列中執(zhí)行時(shí)間在這個(gè)Barrier以后的同步消息都會被這個(gè)Barrier攔截住無法執(zhí)行,直到我們調(diào)用removeBarrier移除了這個(gè)Barrier,而異步消息則沒有影響,消息默認(rèn)就是同步消息,除非我們調(diào)用了Message的setAsynchronous,這個(gè)方法是隱藏的。只有在初始化Handler時(shí)通過參數(shù)指定往這個(gè)Handler發(fā)送的消息都是異步的,這樣在Handler的enqueueMessage中就會調(diào)用Message的setAsynchronous設(shè)置消息是異步的,從上面Handler.enqueueMessage的代碼中可以看到。

       所謂異步消息,其實(shí)只有一個(gè)作用,就是在設(shè)置Barrier時(shí)仍可以不受Barrier的影響被正常處理,如果沒有設(shè)置Barrier,異步消息就與同步消息沒有區(qū)別,可以通過removeSyncBarrier移除Barrier:

      void removeSyncBarrier(int token) {
          // Remove a sync barrier token from the queue.
          // If the queue is no longer stalled by a barrier then wake it.
          final boolean needWake;
          synchronized (this) {
              Message prev = null;
              Message p = mMessages;
              while (p != null && (p.target != null || p.arg1 != token)) {
                  prev = p;
                  p = p.next;
              }
              if (p == null) {
                  throw new IllegalStateException("The specified message queue synchronization "
                          + " barrier token has not been posted or has already been removed.");
              }
              if (prev != null) {
                  prev.next = p.next;
                  needWake = false;
              } else {
                  mMessages = p.next;
                  needWake = mMessages == null || mMessages.target != null;
              }
              p.recycle();
          }
          if (needWake) {
              nativeWake(mPtr);
          }
      }

      參數(shù)token就是enqueueSyncBarrier的返回值,如果沒有調(diào)用指定的token不存在是會拋異常的。

      enqueueMessage

      接下來看一下是怎么MessageQueue的enqueueMessage。

          final boolean enqueueMessage(Message msg, long when) {
              if (msg.isInUse()) {
                  throw new AndroidRuntimeException(msg + " This message is already in use.");
              }
              if (msg.target == null) {
                  throw new AndroidRuntimeException("Message must have a target.");
              }
      
              boolean needWake;
              synchronized (this) {
                  if (mQuiting) {
                      RuntimeException e = new RuntimeException(
                              msg.target + " sending message to a Handler on a dead thread");
                      Log.w("MessageQueue", e.getMessage(), e);
                      return false;
                  }
      
                  msg.when = when;
                  Message p = mMessages;
                  if (p == null || when == 0 || when < p.when) {
                      // New head, wake up the event queue if blocked.
                      msg.next = p;
                      mMessages = msg;
                      needWake = mBlocked;
                  } else {
                      // Inserted within the middle of the queue.  Usually we don't have to wake
                      // up the event queue unless there is a barrier at the head of the queue
                      // and the message is the earliest asynchronous message in the queue.
                      needWake = mBlocked && p.target == null && msg.isAsynchronous();
                      Message prev;
                      for (;;) {
                          prev = p;
                          p = p.next;
                          if (p == null || when < p.when) {
                              break;
                          }
                          if (needWake && p.isAsynchronous()) {
                              needWake = false;
                          }
                      }
                      msg.next = p; // invariant: p == prev.next
                      prev.next = msg;
                  }
              }
              if (needWake) {
                  nativeWake(mPtr);
              }
              return true;
          }

      注意上面代碼紅色的部分,當(dāng)msg.target為null時(shí)是直接拋異常的。

      在enqueueMessage中首先判斷,如果當(dāng)前的消息隊(duì)列為空,或者新添加的消息的執(zhí)行時(shí)間when是0,或者新添加的消息的執(zhí)行時(shí)間比消息隊(duì)列頭的消息的執(zhí)行時(shí)間還早,就把消息添加到消息隊(duì)列頭(消息隊(duì)列按時(shí)間排序),否則就要找到合適的位置將當(dāng)前消息添加到消息隊(duì)列。

      Native發(fā)送消息

      消息模型不只是Java層用的,Native層也可以用,前面也看到了消息隊(duì)列初始化時(shí)也同時(shí)初始化了Native層的Looper與NativeMessageQueue,所以Native層應(yīng)該也是可以發(fā)送消息的。與Java層不同的是,Native層是通過Looper發(fā)消息的,同樣所有的發(fā)送方法最終是調(diào)用sendMessageAtTime:

      void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
              const Message& message) {
      #if DEBUG_CALLBACKS
          ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
                  this, uptime, handler.get(), message.what);
      #endif
      
          size_t i = 0;
          { // acquire lock
              AutoMutex _l(mLock);
      
              size_t messageCount = mMessageEnvelopes.size();
              while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
                  i += 1;
              }
      
              MessageEnvelope messageEnvelope(uptime, handler, message);
              mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
      
              // Optimization: If the Looper is currently sending a message, then we can skip
              // the call to wake() because the next thing the Looper will do after processing
              // messages is to decide when the next wakeup time should be.  In fact, it does
              // not even matter whether this code is running on the Looper thread.
              if (mSendingMessage) {
                  return;
              }
          } // release lock
      
          // Wake the poll loop only when we enqueue a new message at the head.
          if (i == 0) {
              wake();
          }
      }

       Native Message只有一個(gè)int型的what字段用來區(qū)分不同的消息,sendMessageAtTime指定了Message,Message要執(zhí)行的時(shí)間when,與處理這個(gè)消息的Handler:MessageHandler,然后用MessageEnvelope封裝了time, MessageHandler與Message,Native層發(fā)的消息都保存到了mMessageEnvelopes中,mMessageEnvelopes是一個(gè)Vector<MessageEnvelope>。Native層消息同樣是按時(shí)間排序,與Java層的消息分別保存在兩個(gè)隊(duì)列里。

      消息循環(huán)

      消息隊(duì)列初始化好了,也知道怎么發(fā)消息了,下面就是怎么處理消息了,看Handler.loop函數(shù):

          public static void loop() {
              final Looper me = myLooper();
              if (me == null) {
                  throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
              }
              final MessageQueue queue = me.mQueue;
      
              // Make sure the identity of this thread is that of the local process,
              // and keep track of what that identity token actually is.
              Binder.clearCallingIdentity();
              final long ident = Binder.clearCallingIdentity();
      
              for (;;) {
                  Message msg = queue.next(); // might block
                  if (msg == null) {
                      // No message indicates that the message queue is quitting.
                      return;
                  }
      
                  // This must be in a local variable, in case a UI event sets the logger
                  Printer logging = me.mLogging;
                  if (logging != null) {
                      logging.println(">>>>> Dispatching to " + msg.target + " " +
                              msg.callback + ": " + msg.what);
                  }
      
                  msg.target.dispatchMessage(msg);
      
                  if (logging != null) {
                      logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
                  }
      
                  // Make sure that during the course of dispatching the
                  // identity of the thread wasn't corrupted.
                  final long newIdent = Binder.clearCallingIdentity();
                  if (ident != newIdent) {
                      Log.wtf(TAG, "Thread identity changed from 0x"
                              + Long.toHexString(ident) + " to 0x"
                              + Long.toHexString(newIdent) + " while dispatching to "
                              + msg.target.getClass().getName() + " "
                              + msg.callback + " what=" + msg.what);
                  }
      
                  msg.recycle();
              }
          }

      loop每次從MessageQueue取出一個(gè)Message,調(diào)用msg.target.dispatchMessage(msg),target就是發(fā)送message時(shí)跟message關(guān)聯(lián)的handler,這樣就調(diào)用到了熟悉的dispatchMessage,Message被處理后會被recycle。當(dāng)queue.next返回null時(shí)會退出消息循環(huán),接下來就看一下MessageQueue.next是怎么取出消息的,又會在什么時(shí)候返回null。

      final Message next() {
              int pendingIdleHandlerCount = -1; // -1 only during first iteration
              int nextPollTimeoutMillis = 0;
      
              for (;;) {
                  if (nextPollTimeoutMillis != 0) {
                      Binder.flushPendingCommands();
                  }
                  nativePollOnce(mPtr, nextPollTimeoutMillis);
      
                  synchronized (this) {
                      if (mQuiting) {
                          return null;
                      }
      
                      // Try to retrieve the next message.  Return if found.
                      final long now = SystemClock.uptimeMillis();
                      Message prevMsg = null;
                      Message msg = mMessages;
                      if (msg != null && msg.target == null) {
                          // Stalled by a barrier.  Find the next asynchronous message in the queue.
                          do {
                              prevMsg = msg;
                              msg = msg.next;
                          } while (msg != null && !msg.isAsynchronous());
                      }
                      if (msg != null) {
                          if (now < msg.when) {
                              // Next message is not ready.  Set a timeout to wake up when it is ready.
                              nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                          } else {
                              // Got a message.
                              mBlocked = false;
                              if (prevMsg != null) {
                                  prevMsg.next = msg.next;
                              } else {
                                  mMessages = msg.next;
                              }
                              msg.next = null;
                              if (false) Log.v("MessageQueue", "Returning message: " + msg);
                              msg.markInUse();
                              return msg;
                          }
                      } else {
                          // No more messages.
                          nextPollTimeoutMillis = -1;
                      }
      
                      // If first time idle, then get the number of idlers to run.
                      // Idle handles only run if the queue is empty or if the first message
                      // in the queue (possibly a barrier) is due to be handled in the future.
                      if (pendingIdleHandlerCount < 0
                              && (mMessages == null || now < mMessages.when)) {
                          pendingIdleHandlerCount = mIdleHandlers.size();
                      }
                      if (pendingIdleHandlerCount <= 0) {
                          // No idle handlers to run.  Loop and wait some more.
                          mBlocked = true;
                          continue;
                      }
      
                      if (mPendingIdleHandlers == null) {
                          mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                      }
                      mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                  }
      
                  // Run the idle handlers.
                  // We only ever reach this code block during the first iteration.
                  for (int i = 0; i < pendingIdleHandlerCount; i++) {
                      final IdleHandler idler = mPendingIdleHandlers[i];
                      mPendingIdleHandlers[i] = null; // release the reference to the handler
      
                      boolean keep = false;
                      try {
                          keep = idler.queueIdle();
                      } catch (Throwable t) {
                          Log.wtf("MessageQueue", "IdleHandler threw exception", t);
                      }
      
                      if (!keep) {
                          synchronized (this) {
                              mIdleHandlers.remove(idler);
                          }
                      }
                  }
      
                  // Reset the idle handler count to 0 so we do not run them again.
                  pendingIdleHandlerCount = 0;
      
                  // While calling an idle handler, a new message could have been delivered
                  // so go back and look again for a pending message without waiting.
                  nextPollTimeoutMillis = 0;
              }
          }

      MessageQueue.next首先會調(diào)用nativePollOnce,然后如果mQuiting為true就返回null,Looper就會退出消息循環(huán)。

      接下來取消息隊(duì)列頭部的消息,如果頭部消息是Barrier(target==null)就往后遍歷找到第一個(gè)異步消息,接下來檢測獲取到的消息(消息隊(duì)列頭部的消息或者第一個(gè)異步消息),如果為null表示沒有消息要執(zhí)行,設(shè)置nextPollTimeoutMillis = -1;否則檢測這個(gè)消息要執(zhí)行的時(shí)間,如果到執(zhí)行時(shí)間了就將這個(gè)消息markInUse并從消息隊(duì)列移除,然后從next返回到loop;否則設(shè)置nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE),即距離最近要執(zhí)行的消息還需要多久,無論是當(dāng)前消息隊(duì)列沒有消息可以執(zhí)行(設(shè)置了Barrier并且沒有異步消息或消息隊(duì)列為空)還是隊(duì)列頭部的消息未到執(zhí)行時(shí)間,都會執(zhí)行后面的代碼,看有沒有設(shè)置IdleHandler,如果有就運(yùn)行IdleHandler,當(dāng)IdleHandler被執(zhí)行之后會設(shè)置nextPollTimeoutMillis = 0。

      首先看一下nativePollOnce,native方法,調(diào)用JNI,最后調(diào)到了Native Looper::pollOnce,并從Java層傳進(jìn)去了nextPollTimeMillis,即Java層的消息隊(duì)列中執(zhí)行時(shí)間最近的消息還要多久到執(zhí)行時(shí)間。

      int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
          int result = 0;
          for (;;) {
              while (mResponseIndex < mResponses.size()) {
                  const Response& response = mResponses.itemAt(mResponseIndex++);
                  int ident = response.request.ident;
                  if (ident >= 0) {
                      int fd = response.request.fd;
                      int events = response.events;
                      void* data = response.request.data;
      #if DEBUG_POLL_AND_WAKE
                      ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                              "fd=%d, events=0x%x, data=%p",
                              this, ident, fd, events, data);
      #endif
                      if (outFd != NULL) *outFd = fd;
                      if (outEvents != NULL) *outEvents = events;
                      if (outData != NULL) *outData = data;
                      return ident;
                  }
              }
      
              if (result != 0) {
      #if DEBUG_POLL_AND_WAKE
                  ALOGD("%p ~ pollOnce - returning result %d", this, result);
      #endif
                  if (outFd != NULL) *outFd = 0;
                  if (outEvents != NULL) *outEvents = 0;
                  if (outData != NULL) *outData = NULL;
                  return result;
              }
      
              result = pollInner(timeoutMillis);
          }
      }

      先不看開始的一大串代碼,先看一下pollInner:

      int Looper::pollInner(int timeoutMillis) {
      #if DEBUG_POLL_AND_WAKE
          ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
      #endif
      
          // Adjust the timeout based on when the next message is due.
          if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
              nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
              int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
              if (messageTimeoutMillis >= 0
                      && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
                  timeoutMillis = messageTimeoutMillis;
              }
      #if DEBUG_POLL_AND_WAKE
              ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                      this, mNextMessageUptime - now, timeoutMillis);
      #endif
          }
      
          // Poll.
          int result = ALOOPER_POLL_WAKE;
          mResponses.clear();
          mResponseIndex = 0;
      
          struct epoll_event eventItems[EPOLL_MAX_EVENTS];
          int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
      
          // Acquire lock.
          mLock.lock();
      
          // Check for poll error.
          if (eventCount < 0) {
              if (errno == EINTR) {
                  goto Done;
              }
              ALOGW("Poll failed with an unexpected error, errno=%d", errno);
              result = ALOOPER_POLL_ERROR;
              goto Done;
          }
      
          // Check for poll timeout.
          if (eventCount == 0) {
      #if DEBUG_POLL_AND_WAKE
              ALOGD("%p ~ pollOnce - timeout", this);
      #endif
              result = ALOOPER_POLL_TIMEOUT;
              goto Done;
          }
      
          // Handle all events.
      #if DEBUG_POLL_AND_WAKE
          ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
      #endif
      
          for (int i = 0; i < eventCount; i++) {
              int fd = eventItems[i].data.fd;
              uint32_t epollEvents = eventItems[i].events;
              if (fd == mWakeReadPipeFd) {
                  if (epollEvents & EPOLLIN) {
                      awoken();
                  } else {
                      ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
                  }
              } else {
                  ssize_t requestIndex = mRequests.indexOfKey(fd);
                  if (requestIndex >= 0) {
                      int events = 0;
                      if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                      if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                      if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                      if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                      pushResponse(events, mRequests.valueAt(requestIndex));
                  } else {
                      ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                              "no longer registered.", epollEvents, fd);
                  }
              }
          }
      Done: ;
      
          // Invoke pending message callbacks.
          mNextMessageUptime = LLONG_MAX;
          while (mMessageEnvelopes.size() != 0) {
              nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
              const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
              if (messageEnvelope.uptime <= now) {
                  // Remove the envelope from the list.
                  // We keep a strong reference to the handler until the call to handleMessage
                  // finishes.  Then we drop it so that the handler can be deleted *before*
                  // we reacquire our lock.
                  { // obtain handler
                      sp<MessageHandler> handler = messageEnvelope.handler;
                      Message message = messageEnvelope.message;
                      mMessageEnvelopes.removeAt(0);
                      mSendingMessage = true;
                      mLock.unlock();
      
      #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                      ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                              this, handler.get(), message.what);
      #endif
                      handler->handleMessage(message);
                  } // release handler
      
                  mLock.lock();
                  mSendingMessage = false;
                  result = ALOOPER_POLL_CALLBACK;
              } else {
                  // The last message left at the head of the queue determines the next wakeup time.
                  mNextMessageUptime = messageEnvelope.uptime;
                  break;
              }
          }
      
          // Release lock.
          mLock.unlock();
      
          // Invoke all response callbacks.
          for (size_t i = 0; i < mResponses.size(); i++) {
              Response& response = mResponses.editItemAt(i);
              if (response.request.ident == ALOOPER_POLL_CALLBACK) {
                  int fd = response.request.fd;
                  int events = response.events;
                  void* data = response.request.data;
      #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                  ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                          this, response.request.callback.get(), fd, events, data);
      #endif
                  int callbackResult = response.request.callback->handleEvent(fd, events, data);
                  if (callbackResult == 0) {
                      removeFd(fd);
                  }
                  // Clear the callback reference in the response structure promptly because we
                  // will not clear the response vector itself until the next poll.
                  response.request.callback.clear();
                  result = ALOOPER_POLL_CALLBACK;
              }
          }
          return result;
      }

      Java層的消息都保存在了Java層MessageQueue的成員mMessages中,Native層的消息都保存在了Native Looper的mMessageEnvelopes中,這就可以說有兩個(gè)消息隊(duì)列,而且都是按時(shí)間排列的。timeOutMillis表示Java層下個(gè)要執(zhí)行的消息還要多久執(zhí)行,mNextMessageUpdate表示Native層下個(gè)要執(zhí)行的消息還要多久執(zhí)行,如果timeOutMillis為0,epoll_wait不設(shè)置TimeOut直接返回;如果為-1說明Java層無消息直接用Native的time out;否則pollInner取這兩個(gè)中的最小值作為timeOut調(diào)用epoll_wait。當(dāng)epoll_wait返回時(shí)就可能有以下幾種情況:

      1. 出錯(cuò)返回。

      2. Time Out

      3. 正常返回,描述符上有事件產(chǎn)生。

      如果是前兩種情況直接goto DONE。

      否則就說明FD上有事件發(fā)生了,如果是mWakeReadPipeFd的EPOLLIN事件就調(diào)用awoken,如果不是mWakeReadPipeFd,那就是通過addFD添加的fd,在addFD中將要監(jiān)聽的fd及其events,callback,data封裝成了Request對象,并以fd為鍵保存到了KeyedVector mRequests中,所以在這里就以fd為鍵獲得在addFD時(shí)關(guān)聯(lián)的Request,并連同events通過pushResonse加入mResonse隊(duì)列(Vector),Resonse僅是對events與Request的封裝。如果是epoll_wait出錯(cuò)或timeout,就沒有描述符上有事件,就不用執(zhí)行這一段代碼,所以直接goto DONE了。

      void Looper::pushResponse(int events, const Request& request) {
          Response response;
          response.events = events;
          response.request = request;
          mResponses.push(response);
      }

      接下來進(jìn)入DONE部分,從mMessageEnvelopes取出頭部的Native消息,如果到達(dá)了執(zhí)行時(shí)間就調(diào)用它內(nèi)部保存的MessageeHandler的handleMessage處理并從Native 消息隊(duì)列移除,設(shè)置result為ALOOPER_POLL_CALLBACK,否則計(jì)算mNextMessageUptime表示Native消息隊(duì)列下一次消息要執(zhí)行的時(shí)間。如果未到頭部消息的執(zhí)行時(shí)間有可能是Java層消息隊(duì)列消息的執(zhí)行時(shí)間小于Native層消息隊(duì)列頭部消息的執(zhí)行時(shí)間,到達(dá)了Java層消息的執(zhí)行時(shí)間epoll_wait TimeOut返回了,或都通過addFd添加的描述符上有事件發(fā)生導(dǎo)致epoll_wait返回,或者epoll_wait是出錯(cuò)返回。Native消息是沒有Barrier與Asynchronous的。

      最后,遍歷mResponses(前面剛通過pushResponse存進(jìn)去的),如果response.request.ident == ALOOPER_POLL_CALLBACK,就調(diào)用注冊的callback的handleEvent(fd, events, data)進(jìn)行處理,然后從mResonses隊(duì)列中移除,這次遍歷完之后,mResponses中保留來來的就都是ident>=0并且callback為NULL的了。在NativeMessageQueue初始化Looper時(shí)傳入了mAllowNonCallbacks為false,所以這次處理完后mResponses一定為空。

      接下來返回到pollOnce。pollOnce是一個(gè)for循環(huán),pollInner中處理了所有response.request.ident==ALOOPER_POLL_CALLBACK的Response,在第二次進(jìn)入for循環(huán)后如果mResponses不為空就可以找到ident>0的Response,將其ident作為返回值返回由調(diào)用pollOnce的函數(shù)自己處理,在這里我們是在NativeMessageQueue中調(diào)用的Loope的pollOnce,沒對返回值進(jìn)行處理,而且mAllowNonCallbacks為false也就不可能進(jìn)入這個(gè)循環(huán)。pollInner返回值不可能是0,或者說只可能是負(fù)數(shù),所以pollOnce中的for循環(huán)只會執(zhí)行兩次,在第二次就返回了。

      Native Looper可以單獨(dú)使用,也有一個(gè)prepare函數(shù),這時(shí)mAllowNonCallbakcs值可能為true,pollOnce中對mResponses的處理就有意義了。

       wake與awoken

      在Native Looper的構(gòu)造函數(shù)中,通過pipe打開了一個(gè)管道,并用mWakeReadPipeFd與mWakeWritePipeFd分別保存了管道的讀端與寫端,然后用epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd,& eventItem)監(jiān)聽了讀端的EPOLLIN事件,在pollInner中通過epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)讀取事件,那是在什么時(shí)候往mWakeWritePipeFd寫,又是在什么時(shí)候讀的mWakeReadPipeFd呢?

      在Looper.cpp中我們可以發(fā)現(xiàn)如下兩個(gè)函數(shù):

      void Looper::wake() {
      #if DEBUG_POLL_AND_WAKE
          ALOGD("%p ~ wake", this);
      #endif
      
          ssize_t nWrite;
          do {
              nWrite = write(mWakeWritePipeFd, "W", 1);
          } while (nWrite == -1 && errno == EINTR);
      
          if (nWrite != 1) {
              if (errno != EAGAIN) {
                  ALOGW("Could not write wake signal, errno=%d", errno);
              }
          }
      }
      
      void Looper::awoken() {
      #if DEBUG_POLL_AND_WAKE
          ALOGD("%p ~ awoken", this);
      #endif
      
          char buffer[16];
          ssize_t nRead;
          do {
              nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
          } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
      }

      wake函數(shù)向mWakeWritePipeFd寫入了一個(gè)“W”字符,awoken從mWakeReadPipeFd讀,往mWakeWritePipeFd寫數(shù)據(jù)只是為了在pollInner中的epoll_wait可以監(jiān)聽到事件返回。在pollInner也可以看到如果是mWakeReadPipeFd的EPOLLIN事件只是調(diào)用了awoken消耗掉了寫入的字符就往后處理了。

      那什么時(shí)候調(diào)用wake呢?這個(gè)只要找到調(diào)用的地方分析一下就行了,先看Looper.cpp,在sendMessageAtTime即發(fā)送Native Message的時(shí)候,根據(jù)發(fā)送的Message的執(zhí)行時(shí)間查找mMessageEnvelopes計(jì)算應(yīng)該插入的位置,如果是在頭部插入,就調(diào)用wake喚醒epoll_wait,因?yàn)樵谶M(jìn)入pollInner時(shí)根據(jù)Java層消息隊(duì)列頭部消息的執(zhí)行時(shí)間與Native層消息隊(duì)列頭部消息的執(zhí)行時(shí)間計(jì)算出了一個(gè)timeout,如果這個(gè)新消息是在頭部插入,說明執(zhí)行時(shí)間至少在上述兩個(gè)消息中的一個(gè)之前,所以應(yīng)該喚醒epoll_wait,epoll_wait返回后,檢查Native消息隊(duì)列,看頭部消息即剛插入的消息是否到執(zhí)行時(shí)間了,到了就執(zhí)行,否則就可能需要設(shè)置新的timeout。同樣在Java層的MessageQueue中,有一個(gè)函數(shù)nativeWake也同樣可以通過JNI調(diào)用wake,調(diào)用nativeWake的時(shí)機(jī)與在Native調(diào)用wake的時(shí)機(jī)類似,在消息隊(duì)列頭部插入消息,還有一種情況就是,消息隊(duì)列頭部是一個(gè)Barrier,而且插入的消息是第一個(gè)異步消息。

      if (p == null || when == 0 || when < p.when) {
          // New head, wake up the event queue if blocked.
          msg.next = p;
          mMessages = msg;
          needWake = mBlocked;
      } else {
          // Inserted within the middle of the queue.  Usually we don't have to wake
          // up the event queue unless there is a barrier at the head of the queue
          // and the message is the earliest asynchronous message in the queue.
          needWake = mBlocked && p.target == null && msg.isAsynchronous();//如果頭部是Barrier并且新消息是異步消息則“有可能”需要喚醒
          Message prev;
          for (;;) {
              prev = p;
              p = p.next;
              if (p == null || when < p.when) {
                  break;
              }
              if (needWake && p.isAsynchronous()) { // 消息隊(duì)列中有異步消息并且執(zhí)行時(shí)間在新消息之前,所以不需要喚醒。
                  needWake = false;
              }
          }
          msg.next = p; // invariant: p == prev.next
          prev.next = msg;
      }

      在頭部插入消息不一定調(diào)用nativeWake,因?yàn)橹翱赡苷趫?zhí)行IdleHandler,如果執(zhí)行了IdleHandler,就在IdleHandler執(zhí)行后把nextPollTimeoutMillis設(shè)置為0,下次進(jìn)入for循環(huán)就用0調(diào)用nativePollOnce,不需要wake,只有在沒有消息可以執(zhí)行(消息隊(duì)列為空或沒到執(zhí)行時(shí)間)并且沒有設(shè)置IdleHandler時(shí)mBlocked才會為true。

      如果Java層的消息隊(duì)列被Barrier Block住了并且當(dāng)前插入的是一個(gè)異步消息有可能需要喚醒Looper,因?yàn)楫惒较⒖梢栽贐arrier下執(zhí)行,但是這個(gè)異步消息一定要是執(zhí)行時(shí)間最早的異步消息。

      退出Looper也需要wake,removeSyncBarrier時(shí)也可能需要。

      posted @ 2013-09-29 16:00  AngelDevil  閱讀(88359)  評論(11)    收藏  舉報(bào)
      主站蜘蛛池模板: 免费无码黄十八禁网站| 少妇久久久被弄到高潮| 亚洲综合无码久久精品综合| 亚洲永久精品一区二区三区| 亚洲av永久无码精品网站 | 日本欧美大码a在线观看| 日韩av高清在线看片| 国产精品沙发午睡系列990531 | 五月丁香色综合久久4438| 久久夜色国产噜噜亚洲av| 久久综合伊人77777| 中文字幕亚洲制服在线看| 老色鬼永久精品网站| 国产二区三区不卡免费| 99久久婷婷国产综合精品青草漫画| 凹凸国产熟女精品视频| 中文字幕人妻色偷偷久久| 国产线播放免费人成视频播放| 天天拍夜夜添久久精品大| 国产精品第一二三区久久| 久久婷婷综合色丁香五月| 一个色综合国产色综合| 德州市| 免费无码AV一区二区波多野结衣| 亚洲国产在一区二区三区| 曰韩亚洲AV人人夜夜澡人人爽| 亚洲日韩国产一区二区三区在线| 成人欧美一区二区三区在线观看| 激情综合色综合久久综合| 二连浩特市| 国产精品久久国产精麻豆99网站| 深夜福利啪啪片| 亚洲经典在线中文字幕| 久久精品中文字幕少妇| 都市激情 在线 亚洲 国产| 国产午夜亚洲精品福利| 亚洲嫩模一区二区三区| 久久人人97超碰人人澡爱香蕉 | 成年女人免费碰碰视频| 人妻久久久一区二区三区| 成 人 免费 在线电影|