
  • 一、MessageQueue 的 Java 层机制
  • 二、MessageQueue 的 native 层阻塞机制
  • 三、MessageQueue 的 native 层解除阻塞机制
  • 三、MessageQueue 的 native 层 JNI 方法动态注册
  • 三、MessageQueue 的 native 层完整代码 android_os_MessageQueue.cpp

一、MessageQueue 的 Java 层机制

之前在 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 ) 中 , 模仿 Android 的 MessageQueue 手写的 MessageQueue , 使用了如下同步机制 ,

从 消息队列 MessageQueue 中取出 消息 Message ,

如果当前链表为空 , 此时会 调用 wait 方法阻塞 , 直到消息入队时 , 链表中有了元素 , 会调用 notify 解除该阻塞 ;

在实际的 Android 中的 消息队列 MessageQueue 的同步机制 是在 native 层实现 的 ;

在创建 消息队列 MessageQueue 时 , 调用了 nativeInit() 方法 , 销毁 MessageQueue 时调用 nativeDestroy 方法 ;

如果调用 next 获取下一个消息时 , 如果当前消息队列 MessageQueue 中没有消息 , 此时需要阻塞 , 调用 nativePollOnce 即可实现在 native 阻塞线程 ;

    // 初始化 MessageQueue 时调用的方法 private native static long nativeInit();// 销毁 MessageQueue 时调用的方法 private native static void nativeDestroy(long ptr);// 线程阻塞方法@UnsupportedAppUsageprivate native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/// 线程唤醒方法 private native static void nativeWake(long ptr);private native static boolean nativeIsPolling(long ptr);private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);// 此处初始化 MessageQueue , 调用了 nativeInit 方法 MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();}// 获取消息队列中的下一个消息 @UnsupportedAppUsageMessage next() {// Return here if the message loop has already quit and been disposed.// This can happen if the application tries to restart a looper after quit// which is not supported.final long ptr = mPtr;if (ptr == 0) {return null;}int pendingIdleHandlerCount = -1; // -1 only during first iterationint nextPollTimeoutMillis = 0;for (;;) {if (nextPollTimeoutMillis != 0) {Binder.flushPendingCommands();}// 此处阻塞线程 nativePollOnce(ptr, nextPollTimeoutMillis);}}

二、MessageQueue 的 native 层阻塞机制

线程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 该方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中实现 ,

从 Java 层传入 long 类型 , 然后转为 NativeMessageQueue* 类型指针 ,

该 Java 层传入的 long 类型是初始化消息队列时 , 由 nativeInit 方法返回 , 是 消息队列在 Native 层的指针 ,

之后 NativeMessageQueue 指针调用了其本身的 pollOnce 函数 , 该函数中 , 主要调用了 Looper 的 pollOnce 函数 , mLooper->pollOnce(timeoutMillis) ;

frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相关源码 :

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;// 这是主要操作 mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;}
}static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);

参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp

继续查看 Native 层 Looper.cpp 的 pollOnce 方法 ,

Looper.cpp 源码路径是 system/core/libutils/Looper.cpp ,

在该方法中 , 最终调用 Looper.cpp 的 pollInner 方法 ,

在 pollInner 方法中 , 调用了 epoll_wait 方法 , 该方法就是等待方法 , 在该方法中会监听 mEpollFd 文件句柄 ,

       #include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);int epoll_pwait(int epfd, struct epoll_event *events,int maxevents, int timeout,const sigset_t *sigmask);

参考 : epoll_wait

system/core/libutils/Looper.cpp 中阻塞相关源码 :

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd=%d, events=0x%x, data=%p",this, ident, fd, events, data);
#endifif (outFd != NULL) *outFd = fd;if (outEvents != NULL) *outEvents = events;if (outData != NULL) *outData = data;return ident;}}if (result != 0) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);
#endifif (outFd != NULL) *outFd = 0;if (outEvents != NULL) *outEvents = 0;if (outData != NULL) *outData = NULL;return result;}result = pollInner(timeoutMillis);}
}int Looper::pollInner(int timeoutMillis) {// ... int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

参考 : system/core/libutils/Looper.cpp

三、MessageQueue 的 native 层解除阻塞机制

在 MessageQueue 消息队列的 Java 层 , 将 Message 消息插入到链表表头后 , 调用了 nativeWake 方法 , 唤醒了线程 , 即解除了阻塞 ;

public final class MessageQueue {boolean enqueueMessage(Message msg, long when) {if (msg.target == null) {throw new IllegalArgumentException("Message must have a target.");}if (msg.isInUse()) {throw new IllegalStateException(msg + " This message is already in use.");}synchronized (this) {if (mQuitting) {IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");Log.w(TAG, e.getMessage(), e);msg.recycle();return false;}msg.markInUse();msg.when = when;Message p = mMessages;boolean needWake;if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.msg.next = p;mMessages = msg;needWake = mBlocked;} else {// Inserted within the middle of the queue.  Usually we don't have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake = mBlocked && p.target == null && msg.isAsynchronous();Message prev;for (;;) {prev = p;p = p.next;if (p == null || when < p.when) {break;}if (needWake && p.isAsynchronous()) {needWake = false;}}msg.next = p; // invariant: p == prev.nextprev.next = msg;}// We can assume mPtr != 0 because mQuitting is false.if (needWake) {nativeWake(mPtr);}}return true;}

在 native 层的 frameworks/base/core/jni/android_os_MessageQueue.cpp 实现了上述

Java 层定义的 private native static void nativeWake(long ptr) 方法 ,

注册 JNI 方法方式是动态注册 , 注册的参数如下 , Java 层的 nativeWake 对应的 native 层方法是 android_os_MessageQueue_nativeWake 方法 ,

static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },

参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp

下面是 frameworks/base/core/jni/android_os_MessageQueue.cpp 中的相关方法实现 ,

在 android_os_MessageQueue_nativeWake 方法中调用了 本身的 wake 方法 ,

在 wake 方法中调用了 system/core/libutils/Looper.cpp 中的 wake 方法 ;

void NativeMessageQueue::wake() {// 此处调用了 Looper 的 wake 函数 mLooper->wake();
}static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();

参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp

查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在该方法中 ,

ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代码说明 ,

向 mWakeEventFd 文件句柄写入了数据 ;

void Looper::wake() {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this);
#endifuint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",mWakeEventFd, strerror(errno));}}

参考 : system/core/libutils/Looper.cpp

阻塞的时候使用的是 mEpollFd 文件句柄 ,

唤醒的时候使用的是 mWakeEventFd 文件句柄 ,

下面分析这两个文件句柄之间的联系 ;

Looper 构造函数 , 调用了 rebuildEpollLocked() 方法 ,

在 rebuildEpollLocked 方法 中调用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 创建了一个句柄 ,

调用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注册事件监听 ,

注册 mEpollFd 句柄 , 监听 mWakeEventFd 句柄的 eventItem 事件 ,

监听的事件是 eventItem.events = EPOLLIN 事件 ,

该事件代表 , 向 mWakeEventFd 文件句柄写入数据 , 此时就对应解除 epoll_wait 阻塞 ;

system/core/libutils/Looper.cpp 中 Looper 构造函数 , rebuildEpollLocked 方法 :

Looper::Looper(bool allowNonCallbacks) :mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",strerror(errno));AutoMutex _l(mLock);rebuildEpollLocked();
}void Looper::rebuildEpollLocked() {// Close old epoll instance if we have one.if (mEpollFd >= 0) {#if DEBUG_CALLBACKSALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endifclose(mEpollFd);}// Allocate the new epoll instance and register the wake pipe.// 创建句柄 mEpollFd = epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union// 注册监听的事件eventItem.events = EPOLLIN;eventItem.data.fd = mWakeEventFd;// 注册事件监听 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",strerror(errno));for (size_t i = 0; i < mRequests.size(); i++) {const Request& request = mRequests.valueAt(i);struct epoll_event eventItem;request.initEventItem(&eventItem);int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",request.fd, strerror(errno));}}

MessageQueue 消息队列是通过 Linux 的 epoll 机制实现的阻塞 ;

三、MessageQueue 的 native 层 JNI 方法动态注册

JNI 动态注册 , 消息队列 MessageQueue 中的注册方法 , 使用的是动态注册 ,

static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};// 动态注册 JNI 函数
int register_android_os_MessageQueue(JNIEnv* env) {int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,NELEM(gMessageQueueMethods));jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,"dispatchEvents", "(II)I");return res;

详情参考 : 【Android NDK 开发】JNI 动态注册 ( 动态注册流程 | JNI_OnLoad 方法 | JNINativeMethod 结构体 | GetEnv | RegisterNatives )

三、MessageQueue 的 native 层完整代码 android_os_MessageQueue.cpp

完整的 android_os_MessageQueue.cpp 代码 :

/** Copyright (C) 2010 The Android Open Source Project** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#define LOG_TAG "MessageQueue-JNI"#include <nativehelper/JNIHelp.h>
#include <android_runtime/AndroidRuntime.h>#include <utils/Looper.h>
#include <utils/Log.h>
#include "android_os_MessageQueue.h"#include "core_jni_helpers.h"namespace android {static struct {jfieldID mPtr;   // native object attached to the DVM MessageQueuejmethodID dispatchEvents;
} gMessageQueueClassInfo;// Must be kept in sync with the constants in Looper.FileDescriptorCallback
static const int CALLBACK_EVENT_INPUT = 1 << 0;
static const int CALLBACK_EVENT_OUTPUT = 1 << 1;
static const int CALLBACK_EVENT_ERROR = 1 << 2;class NativeMessageQueue : public MessageQueue, public LooperCallback {public:NativeMessageQueue();virtual ~NativeMessageQueue();virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);void wake();void setFileDescriptorEvents(int fd, int events);virtual int handleEvent(int fd, int events, void* data);private:JNIEnv* mPollEnv;jobject mPollObj;jthrowable mExceptionObj;
};MessageQueue::MessageQueue() {}MessageQueue::~MessageQueue() {}bool MessageQueue::raiseAndClearException(JNIEnv* env, const char* msg) {if (env->ExceptionCheck()) {jthrowable exceptionObj = env->ExceptionOccurred();env->ExceptionClear();raiseException(env, msg, exceptionObj);env->DeleteLocalRef(exceptionObj);return true;}return false;
}NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}
}NativeMessageQueue::~NativeMessageQueue() {}void NativeMessageQueue::raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) {if (exceptionObj) {if (mPollEnv == env) {if (mExceptionObj) {env->DeleteLocalRef(mExceptionObj);}mExceptionObj = jthrowable(env->NewLocalRef(exceptionObj));ALOGE("Exception in MessageQueue callback: %s", msg);jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);} else {ALOGE("Exception: %s", msg);jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);LOG_ALWAYS_FATAL("raiseException() was called when not in a callback, exiting.");}}
}void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;}
}void NativeMessageQueue::wake() {mLooper->wake();
}void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {if (events) {int looperEvents = 0;if (events & CALLBACK_EVENT_INPUT) {looperEvents |= Looper::EVENT_INPUT;}if (events & CALLBACK_EVENT_OUTPUT) {looperEvents |= Looper::EVENT_OUTPUT;}mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,reinterpret_cast<void*>(events));} else {mLooper->removeFd(fd);}
}int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {int events = 0;if (looperEvents & Looper::EVENT_INPUT) {events |= CALLBACK_EVENT_INPUT;}if (looperEvents & Looper::EVENT_OUTPUT) {events |= CALLBACK_EVENT_OUTPUT;}if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {events |= CALLBACK_EVENT_ERROR;}int oldWatchedEvents = reinterpret_cast<intptr_t>(data);int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,gMessageQueueClassInfo.dispatchEvents, fd, events);if (!newWatchedEvents) {return 0; // unregister the fd}if (newWatchedEvents != oldWatchedEvents) {setFileDescriptorEvents(fd, newWatchedEvents);}return 1;
}// ----------------------------------------------------------------------------sp<MessageQueue> android_os_MessageQueue_getMessageQueue(JNIEnv* env, jobject messageQueueObj) {jlong ptr = env->GetLongField(messageQueueObj, gMessageQueueClassInfo.mPtr);return reinterpret_cast<NativeMessageQueue*>(ptr);
}static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}nativeMessageQueue->incStrong(env);return reinterpret_cast<jlong>(nativeMessageQueue);
}static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->decStrong(env);
}static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();
}static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);return nativeMessageQueue->getLooper()->isPolling();
}static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,jlong ptr, jint fd, jint events) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->setFileDescriptorEvents(fd, events);
}// ----------------------------------------------------------------------------// 动态注册 JNI 函数的结构体
// 每个结构体中的元素是 Java 方法名称 , 方法签名 , C++ 中的方法指针
static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};// 动态注册 JNI 函数
int register_android_os_MessageQueue(JNIEnv* env) {int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,NELEM(gMessageQueueMethods));jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,"dispatchEvents", "(II)I");return res;
}} // namespace android

参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp

