开发者

Native层消息机制深入探究实例解析

开发者 https://www.devze.com 2023-01-18 10:20 出处:网络 作者: 大胃粥
目录引言Looper的创建发送消息与监听请求发送消息监听请求Looper 处理消息或请求结束引言
目录
  • 引言
  • Looper的创建
  • 发送消息与监听请求
    • 发送消息
    • 监听请求
  • Looper 处理消息或请求
    • 结束

      引言

      在分析底层源码时,时不时会碰到 Looper::wake() 或者 Looper::pollOnce() 这样的代码,之前大概知道是 Native 层的消息循环机制。为了以后我也能够使用它,我决定还是彻底分析一遍源码。

      本文只涉及一个文件,路径如下

      system/core/libutils/Looper.cpp

      Looper的创建

      在 Java 层,有一个线程的子www.devze.com类 HandlerThread,它可以创建一个线程,并且使用消息机制,其中的关键两步是 Looper::prepare() 和 Looper::loop()。

      Looper::prepare() 会创建一个与线程绑定的 Looper 对象,这个绑定是通过 ThreadLocal实现的,而 Looper::loop() 会让线程进入无限循环来处理消息。

      在 Native 层,也有一个 Looper 类,可以通过 Looper::prepare() 来创建 Looper 对象,代码如下

      sp<Looper> Looper::prepare(int opts) {
          // opts决定Looper::addFd()的参数callback是否可以为空
          bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
          // 通过pthread_getspecific()获取线程本地存储中的Looper对象
          sp<Looper> looper = Looper::getForThread();
          if (looper == nullptr) {
              looper = new Looper(allowNonCallbacks);
              // 通过pthread_setspecific()把Looper对象保存到线程本地存储中
              Looper::setForThread(looper);
          }
          return looper;
      }
      

      Native 层的线程在首次调用 Looper::prepare() 时,会创建 Looper 对象,并通过 pthread_setspecific() 把它保存到线程本地存储中,后面再获取 Looper 对象时,通过 pthread_getspecific() 从线程本地存储中获取。

      这不就是 Java 的 ThreadLocal 的功能吗?

      现在让我们来看下 Looper 对象的创建过程

      Looper::Looper(bool allowNonCallbacks)
          : mAllowNonCallbacks(allowNonCallbacks),
            mSendingMessage(false),
            mPolling(false),
            mEpollRebuildRequired(false),
            mNextRequestSeq(0),
            mResponseIndex(0),
            mNextMessageUptime(LLONG_MAX) {
          // 1. 创建 eventfd 对象,用于唤醒 Looper
          mWakeEventFd.reset(eventfd(0, EFD_NONblock | EFD_CLOEXEC));
          AutoMutex _l(mLock);
          // 2. 创建 epoll 对象,并监听刚才创建的 eventfd 的输入事件
          rebuildEpollLocked();
      }
      

      首先创建了一个eventfd 对php象,由 mWakeEventFd 代表,它用于唤醒 Looper 。如何唤醒呢? 继续往下看。

      然后调用 rebuildEpollLocked() 创建一个 epoll 对象,并监听刚才创建的 mWakeEventFd 的 I/O 事件,代码如开发者_开发学习

      void Looper::rebuildEpollLocked() {
          // ...
          // 创建epoll对象
          mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
          // 监听 mWakeEventFd 输入事件
          struct epoll_event eventItem;
          memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
          eventItem.events = EPOLLIN;
          eventItem.data.fd = mWakeEventFd.get();
          int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
          for (size_t i = 0; i < mRequests.size(); i++) {
              // ...
          }
      }
      

      epoll 对象监听了 mWakeEventFd 的可读事件。在后面的分析中,我们将看到,当 Looper 开始轮询时,会调用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 写入数据时,epoll_wait() 将返回,那么 Looper 就被唤醒。

      发送消息与监听请求

      Native 层的 Looper 可以处理两种类型的事件,一种是消息( Message ),另一种是请求( Request )。下面我们来看看如何向 Looper 发送消息,如何让 Looper 监听请求。

      发送消息

      通过 Looper::sendMessageXXX() 这一类函数,可以向 Looper 发送消息

      void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
              const Message& message) {
          size_t i = 0;
          { // acquire lock
              // 通过锁,可以保存 mMessageEnvelopes 的线程安全
              AutoMutex _l(mLock);
              // 获取消息要插入的位置
              size_t messageCount = mMessageEnvelopes.size();
              while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
                  i += 1;
              }
              // 1. 把信息保存到 mMessageEnvelopes 中
              MessageEnvelope messageEnvelope(uptime, handler, message);
              mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
              // mSendingMessage 表明 Looper 正在处理消息,因此不用唤醒Looper
              if (mSendingMessage) {
                  return;
              }
          } // release lock
          // 2. 如有必要,就唤醒Looper
          if (i == 0) {
              wake();
          }
      }
      

      当 Looper 接收到消息时,它会把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么会调用 Looper::wake() 唤醒 Looper 来处理消息。

      前面我们大概地说明了下如何通过 mWakeEventFd 这个 eventfd 对象唤醒 Looper,现在让我们来看下唤醒是如何实现的

      void Looper::wake() {
          uint64_t inc = 1;
          // 向 mWakeFd 中写入数据
          // TEMP_FAILURE_RETRY 是一个重试机制,确保不会因为系统中断而导致数据没有写入
          ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
          // 确保写入的是 unsigned 64-bit 的数据
          if (nWrite != sizeof(uint64_t)) {
              if (errno != EAGAIN) {
                  LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                                   mWakeEventFd.get(), nWrite, strerror(errno));
              }
          }
      }
      

      原来就是向 mWakeEventFd 写数据。在后面我们将看到,当 Looper 进入轮询时, 当epoll_wait() 检测到 mWakeEventFd 有数据可读时,就会从阻塞中醒来,从而达到 mWakeEventFd 唤醒 Looper 的目的。

      监听请求

      现在我们来看下如何让 Looper 监听请求,它是通过 Looper::addFd() 实现的

      int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
          // 1. 确认 ident 的值
          if (!callback.get()) { // 回调为空
              if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在创建Looper时初始化的
                  ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
                  return -1;
              }
              if (ident < 0) { // 回调为空,ident的值不能小于0
                  ALOGE("Invalid attempt to set NULL callback with ident < 0.");
                  return -1;
              }
          } else { // 回调不为空
              // POLL_CALLBACK值为-2,表示请求通过回调处理请求
              ident = POLL_CALLBACK;
          }
          { // acquire lock
              AutoMutex _l(mLock);
              // 2. 包装成一个 Request
              Request request;
              request.fd = fd;
              request.ident = ident;
              request.events = events;
              request.seq = mNextRequestSeq++;
              request.callback = callback;
              request.data = data;
              if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
              struct epoll_event eventItem;
              request.initEventItem(&eventItem);
              // 3. 用 epoll 对象监听 fd 的事件,并把请求保存到 mRequests 中
              ssize_t requestIndex = mRequests.indexOfKey(fd);
              if (requestIndex < 0) {
                  int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
                  if (epollResult < 0) {
                      ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                      return -1;
                  }
                  mRequests.add(fd, request);
              } else {
                  int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
                  if (epollResult < 0) {
                      // ...
                  }
                  mRequests.replaceValueAt(requestIndex, request);
              }
          } // release lock
          return 1;
      }
      

      Looper::addFd() 的实质就是用 epoll 对象监听指定 fd 的 I/O 事件。为何我把这一过程称之为 监听请求 呢 ? 因为这个函数把它的参数包装成一个 Request 对象,并保存到 mRequests 容器 中。

      Looper 进行轮询时,epoll_wait() 会阻塞,当 fd 指向的文件有 I/O 事件时,epoll_wait() 将会获取到 fd 的可读事件,因此 Looper 会被唤醒。

      当 Looper 检测到有请求到来时,一般是通过回调处理的,也就是这里的参数 callback。当然,也可以不设置回调,当有请求到来时,交给外部的调用者去处理,我们将会在后面看到。

      根据以上知识,我们来理解 Looper::addFd() 的第一步。当 callback 参数为空时,ident 必须大于或等于0,否则为 POLL_CALLBACK,注意,这的值是 -2。 因此呢,当我们检测到一个请求的 ident 大于或等于0时,这个请求肯定不是通过回调处理的。这一点非常重要,我们将会在后面用到。

      Looper 处理消息或请求

      Native 层的 Looper 是通过 Looper::pollOnce() 或 Looper::pollAll() 来统一处理消息和请求的,我们挑 Looper::pollOnce() 这个函数来分析下

      int Lohttp://www.devze.comoper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
          int result = 0;
          for (;;) { // 无限循环
              // 1. 处理那些不是通过callback处理的请求
              while (mResponseIndex < mResponses.size()) {
                  // ...
              }
              // 2. 处理pollInner()轮询的结果
              if (result != 0) {
                  // ...
              }
              // 3. epoll 等待并处理事件(如果有事件到来)
              result = pollInner(timeoutMillis);
          }
      }
      

      当首次调用 Looper::pollOnce() 时,第一步和第二步肯定不会发生,那么我们先来看下第三步,这个函数比较长,我们一步步解析

      int Looper::pollInner(int timeoutMillis) {
          // 省略计算 timeoutMillis 的代码
          int result = POLL_WAKE;
          mResponses.clear();
          mResponseIndex = 0;
          mPolling = true;
          struct epoll_event eventItems[EPOLL_MAX_EVENTS];
          // 1. 等待事件
          int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
          // ...
      }
      

      第一步,通过 epoll_wait() 阻塞地等待它监听的 fd 的 I/O 就绪, 此时 Looper 进入休眠。

      那么怎么唤醒 Looper 呢? 根据前面的分析,epoll 对象监听了 mWakeEventFd 以及 通过 Looper::addFd() 添加的 fd。 那么向这些被监听的 fd 写入数据,就可以唤醒 Looper。例如,Looper::wake() 就是通过向 mWakeEventFd 写入数据来唤醒 Looper。

      那么现在我们来看下 Looper 被唤醒后的处理流程

      int Looper::pollInner(int timeoutMillis) {
          // ...
          // 1. 等待I/O就绪
          int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
          // ...
          // 2. 处理 epoll 事件
          for (int i = 0; i < eventCount; i++) {
              int fd = eventItems[i].data.fd;
              uint32_t epollEvents = eventItems[i].events;
              if (fd == mWakeEventFd.get()) { 
                  // 2.1 处理被消息唤醒的情况
                  if (epollEvents & EPOLLIN) {
                      // 清理eventfd中的数据
                      awoken();
                  } else {
                      ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
                  }
              } else { 
                  // 2.2 处理被请求唤醒的情况
                  ssize_t requestIndex = mRequests.indexOfKey(fd);
                  if (requestIndex >= 0) {
                      // 根据epoll触发的事件类型,填充events相应的位
                      int events = 0;
                      if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                      if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                      if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                      if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                      // 创建Response对象,保存两个参数,然后把Response对象保存到mResponses中
                      pushResponse(events, mRequests.valueAt(requestIndex));
                  } else {
                      ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                              "no longer registered.", epollEvents, fd);
                  }
              }
          }    
      }
      

      当 mWakeEventFd 的 I/O 就绪,就会走到2.1步,之后会读取 mWakeEventFd 中的数据,读取的数据并没有什么用,只是清理数据而已。而这一步,大部分情况 是由于消息的到来,而极少情况是并不是因为消息的到来,而是因为线程有紧急事情需要处理,所以必须要唤醒。

      当通过Looper::addFd() 添加的 fd 就绪时,就会走到 2.2 步,这一步一定是因为请求到来了。它会创建 Reponse 对象,并保存,代码如下

      void Looper::pushResponse(int events, const Request& request) {
          Response response;
          response.events = events;
          response.request = request;
          mResponses.push(response);
      }
      

      这里我们要注意下,mResponses 容器表示待处理请求的集合,这些请求会在后面处理,让我们接着往下看。

      int Looper::pollInner(int timeoutMillis) {
          // ...
          // 1. 等待I/O就绪
          int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
          // ...
          // 2. 处理 epoll 事件编程客栈
          for (int i = 0; i < eventCount; i++) {
              // ...
          }    
      Done: ;
          mNextMessageUptime = LLONG_MAX;
          // 3. 处理消息
          while (mMessageEnvelopes.size() != 0) { // 循环处理消息
              nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
              // 3.1 取出队头消息
              const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
              if (messageEnvelope.uptime <= now) { 
                  // 3.2 队头消息处理的时间点小于当前时间点,表示要立即处理消息
                  {
                      // 获取handler
                      sp<MessageHandler> handler = messageEnvelope.handler;
                      Message message = messageEnvelope.message;
                      mMessageEnvelopes.removeAt(0);
                      mSendingMessage = true;
                      mLock.unlock();
                      // 消息交给handler处理
                      handler->handleMessage(message);
                  } // release handler
                  mLock.lock();
                  mSendingMessage = false;
                  // POLL_CALLBACK 表示消息被回调处理
                  resjavascriptult = POLL_CALLBACK;
              } else {
                  // 3.2 队头的消息处理的时间点大于当前时间,表示还没有到处理的时间点,就退出处理消息的循环
                  // mNextMessageUptime 表示下一个消息要处理的时间点,当通过break退出循环后,
                  // 在外层的下一次循调用pollInner()时,会通过 mNextMessageUptime 计算 epoll_wait 的超时时间
                  mNextMessageUptime = messageEnvelope.uptime;
                  break;
              }
          }
          // Release lock.
          mLock.unlock();    
      }
      

      根据前面分析的消息发送的过程,消息保存在 mMessageEnvelopes 中。那么这里的第三步,很明显是在处理消息。通过循环,不断取出消息,然后把消息的 messageEnvelope.uptime 与当前时间进行比较,如果小于当前时间,就证明要立马处理消息了,否则这些消息只能在下一次轮询中再处理。

      处理完了消息,现在来处理请求

      int Looper::pollInner(int timeoutMillis) {
          // ...
          // 1. 等待事件
          int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
          // ...
          // 2. 处理 epoll 事件
          for (int i = 0; i < eventCount; i++) {
              // ...
          }
      Done: ;
          // 3. 处理消息
          while (mMessageEnvelopes.size() != 0) { // 循环处理消息
               // ...
          }
          // 4. 循环处理请求
          for (size_t i = 0; i < mResponses.size(); i++) {
              Response& response = mResponses.editItemAt(i);
              // 检测请求是否通过回调处理
              if (response.request.ident == POLL_CALLBACK) {
                  int fd = response.request.fd;
                  int events = response.events;
                  void* data = response.request.data;
                  int callbackResult = response.request.callback->handleEvent(fd, events, data);
                  if (callbackResult == 0) {
                      removeFd(fd, response.request.seq);
                  }
                  response.request.callback.clear();
                  // 表明消息被回调处理了
                  result = POLL_CALLBACK;
              }
          }
          // 返回结果
          return result;
      }
      

      刚刚我们还提到,mResponse 中保存了待处理的请求。现在通过循环,不断取出请求来处理。处理请求有一个条件,那就是请求必须有回调,否则不处理。 再回顾前面分析 监听请求 的代码,当Looper::addFd() 的参数 callback 不为空时,Request.ident 的值为 POLL_CALLBACK,表明请求需要通过回调处理。

      Looper::pollInner() 函数分析完毕,现在再回到 Looper::pollOnce()

      int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
          int result = 0;
          for (;;) { // 无限循环
              // 1. 处理那些不是通过callback处理的请求
              while (mResponseIndex < mResponses.size()) {
                  const Response& response = mResponses.itemAt(mResponseIndex++);
                  int ident = response.request.ident;
                  // 从Looper::addFd()分析可知,只有当callback为空的情况下,ident的值>=0,否则为POLL_CALLBACK(-2)
                  // 因此,这里处理的是那些没有通过callback处理的请求            
                  if (ident >= 0) {
                      int fd = response.request.fd;
                      int events = response.events;
                      void* data = response.request.data;
                      // 因为Looper无法通过callback处理,所以把这些元数据交给调用者处理
                      if (outFd != nullptr) *outFd = fd;
                      if (outEvents != nullptr) *outEvents = events;
                      if (outData != nullptr) *outData = data;
                      // 注意,这里返回的值大于0
                      return ident;
                  }
              }
              // 2. 处理pollInner()轮询的结果
              // result的值有很多种,但是都为负数(注意,上面处理那些不是通过callback处理的请求,返回正值)
              // 1. POLL_WAKE(-1): 表示epoll_wait()是被eventfd唤醒的
              // 1. POLL_ERROR(-4): 表示epoll_wait()出错
              // 2. POLL_TIMEOUT(-3) : 表示epoll_wait()超时
              // 3. POLL_CALLBACK(-2) : 表示消息或请求是通过回调处理的
              if (result != 0) {
                  // 消息或事件无论是否被callback处理,这些传入的参数都没有意义,因此清空
                  if (outFd != nullptr) *outFd = 0;
                  if (outEvents != nullptr) *outEvents = 0;
                  if (outData != nullptr) *outData = nullptr;
                  // 注意,返回的是负值
                  return result;
              }
              // 3. epoll 等待并处理事件(如果有事件到来)
              result = pollInner(timeoutMillis);
          }
      }
      

      从整体看,当 pollInner() 返回后,就会调用第一步和第二步来处理结果。

      首先来看第一步,根据前面 监听请求 的分析,当 Looper::addFd() 的参数 callback 为空时,Request.ident 的值才大于等于0。Looper::pollInner 只通过回调来处理请求,而对于那些没有回调的请求呢?那就是在这里处理。而处理的方式是直接把元数据返回给调用者,那么意思就很明显了,让调用者自己处理。

      再来看第二步,直接返回 Looper::pollInner() 的结果,并把参数清0。因为无论返回的什么结果,这些参数都没有意义了,这一点请大家自己体会。

      关于第一步和第二步,还有一点需要关注,第一步的返回值是正值,而第二步返回值是负值。

      结束

      本文对 Native 的 Looper 的主要函数进行分析,揭开了 Native 层消息机制的核心,但是目前我并不能给一个很好例子来理解本文的内容。需要大家在分析 Native 层源码时慢慢体会。

      可能有人会问,你为何不以 Java 层的消息机制为例来引出 Native 层的消息机制呢? 因为这样废话太多。

      以上就是Native层消息机制深入探究实例解析的详细内容,更多关于Native层消息机制的资料请关注我们其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号