Android MessageQueue 底层实现(C++)
文章目录
- java 层忽略的 native 函数
- 通过 nativePollOnce 打开 native 世界的大门
- MessageQueue 向 native 层的延伸
- native 层的 Looper
- eventfd 和 epoll 推动消息流转
- 等待超时
- mWakeEventFd 唤醒
- mRequests 列表中文件发生 IO 事件
- 发送一个 Native 消息
- 添加一个文件描述符监控请求
java 层忽略的 native 函数
Looper 在取消息时,我们提到 nativePollOnce 可能会阻塞线程。而在发送消息时,nativeWake 可以唤醒被 nativePollOnce 阻塞的线程。上一节只是说明这两个函数是 native 函数,但是没有进行深入分析。从功能来看很像 wait()/notify() 机制,只不过是用 naitive 方式实现罢了。然而 nativePollOnce/nativeWake 并不像 java 层看到的那么简单。其实它还负责 native 层消息循环。没错,在 native 也有自己的消息循环,并且与 java 使用完全独立的消息队列。这一节的目的就一探 native MessageQueue 究竟。
通过 nativePollOnce 打开 native 世界的大门
nativePollOnce 的 native 实现在 android_os_MessageQueue.cpp 中,对应的 native 函数是 android_os_MessageQueue_nativePollOnce。其代码如下:
[android_os_MessageQueue.cpp -> android_os_MessageQueue_nativePollOnce]
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
MessageQueue 向 native 层的延伸
android_os_MessageQueue_nativePollOnce 的形参 ptr 是 java 层传递过来的指针,其值为 MessageQueue 的 mPtr 域。而 mPtr 在构造函数中初始化:
[MessageQueue.java -> MessageQueue(boolean quitAllowed)]
MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();}
nativeInit 是一个 native 函数,其源码如下:
[android_os_MessageQueue.cpp -> android_os_MessageQueue_nativeInit]
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}nativeMessageQueue->incStrong(env);return reinterpret_cast<jlong>(nativeMessageQueue);
}
可见 MessageQueue 的 mPtr 域是指向 natvie 层的 NativeMessageQueue。即 natvie 层的 NativeMessageQueue 是伴随 java 层的 MessageQueue 而生,它承载了 MessageQueue 在 native 层的全部功能 。
回头再看 android_os_MessageQueue_nativePollOnce 源码,首先获取 java 层引用的 NativeMessageQueue 实例,调用其 pollOnce 函数。
[android_os_MessageQueue.cpp -> pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis)]
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;}
}
native 层的 Looper
mLooper 是 native 层 Looper 类型的实例,在 NativeMessageQueue 的构造函数中初始化:
[android_os_MessageQueue.cpp -> NativeMessageQueue()]
NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}
}
与 java 层类似,mLooper 是线程关联的对象,每个线程只允许一个 Looper 实例。NativeMessageQueue 的 pollOnce 会调用 Looper 的 pollOnce。
注意 Looper 类并没有放在 framework 目录下,而是定义在 system/core/libutils/Looper.cpp 中。
[Looper.cpp -> pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)]
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;if (outFd != NULL) *outFd = fd;if (outEvents != NULL) *outEvents = events;if (outData != NULL) *outData = data;return ident;}}if (result != 0) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);
#endifif (outFd != NULL) *outFd = 0;if (outEvents != NULL) *outEvents = 0;if (outData != NULL) *outData = NULL;return result;}result = pollInner(timeoutMillis);}
}
while 循环用于处理没有回调函数的文件描述符监控,单看这里的代码会让人难以理解,这是因为 mResponses 的初始化在 pollInner 函数中, 所以先看 pollInner 函数:
[Looper.cpp -> pollInner(int timeoutMillis)]
int Looper::pollInner(int timeoutMillis) {// Adjust the timeout based on when the next message is due.if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);if (messageTimeoutMillis >= 0&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {timeoutMillis = messageTimeoutMillis;}}// Poll.int result = POLL_WAKE;mResponses.clear();mResponseIndex = 0;// We are about to idle.mPolling = true;struct epoll_event eventItems[EPOLL_MAX_EVENTS];int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);// No longer idling.mPolling = false;// Acquire lock.mLock.lock();// Rebuild epoll set if needed.if (mEpollRebuildRequired) {mEpollRebuildRequired = false;rebuildEpollLocked();goto Done;}// Check for poll error.if (eventCount < 0) {if (errno == EINTR) {goto Done;}ALOGW("Poll failed with an unexpected error: %s", strerror(errno));result = POLL_ERROR;goto Done;}// Check for poll timeout.if (eventCount == 0) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - timeout", this);
#endifresult = POLL_TIMEOUT;goto Done;}// Handle all events.
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endiffor (int i = 0; i < eventCount; i++) {int fd = eventItems[i].data.fd;uint32_t epollEvents = eventItems[i].events;if (fd == mWakeEventFd) {if (epollEvents & EPOLLIN) {awoken();} else {ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);}} else {ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex >= 0) {int events = 0;if (epollEvents & EPOLLIN) events |= EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}}
Done: ;// Invoke pending message callbacks.mNextMessageUptime = LLONG_MAX;while (mMessageEnvelopes.size() != 0) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes. Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",this, handler.get(), message.what);
#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage = false;result = POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime = messageEnvelope.uptime;break;}}// Release lock.mLock.unlock();// Invoke all response callbacks.for (size_t i = 0; i < mResponses.size(); i++) {Response& response = mResponses.editItemAt(i);if (response.request.ident == POLL_CALLBACK) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",this, response.request.callback.get(), fd, events, data);
#endif// Invoke the callback. Note that the file descriptor may be closed by// the callback (and potentially even reused) before the function returns so// we need to be a little careful when removing the file descriptor afterwards.int callbackResult = response.request.callback->handleEvent(fd, events, data);if (callbackResult == 0) {removeFd(fd, response.request.seq);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result = POLL_CALLBACK;}}return result;
}
mNextMessageUptime 是 native 层最近消息的就绪时间,初始化为 -1,在跳转到 Done 处理消息时会更新。形参 timeoutMillis 是 java 层传递的最近的消息就绪时间。pollInner 通过 timeoutMillis 和 mNextMessageUptime 计算最小的消息就绪时间,重新赋值 timeoutMillis。然后调用 epoll_wait 开始监控,超时时间即 timeoutMillis。epoll 的使用方式参考 epoll
eventfd 和 epoll 推动消息流转
那么 epoll 在哪里初始化呢?答案是 Looper 的构造函数:
Looper::Looper(bool allowNonCallbacks) :mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",strerror(errno));AutoMutex _l(mLock);rebuildEpollLocked();
}
首先初始化 mWakeEventFd,它是 eventfd 类型的文件描述符,在此用作新消息入队时主动唤醒 epoll。然后调用 rebuildEpollLocked 初始化 epoll,其代码如下:
[Looper.cpp -> rebuildEpollLocked()]
void Looper::rebuildEpollLocked() {// Close old epoll instance if we have one.if (mEpollFd >= 0) {close(mEpollFd);}// Allocate the new epoll instance and register the wake pipe.mEpollFd = epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field unioneventItem.events = EPOLLIN;eventItem.data.fd = mWakeEventFd;int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",strerror(errno));for (size_t i = 0; i < mRequests.size(); i++) {const Request& request = mRequests.valueAt(i);struct epoll_event eventItem;request.initEventItem(&eventItem);int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",request.fd, strerror(errno));}}
}
先 mWakeEventFd 添加到 epoll 中监控,然后将 mRequests 列表引用的文件描述符添加到 epoll 中间监控。mRequestes 的初始化后续再讨论。
至此通过 mWakeEventFd 和 epoll_wait 已经实现了了 java 层需要的阻塞功能。
继续回到 pollInner 源码,我们来看 epoll_wait 返回有几种情况:
- 等待超时
- mWakeEventFd 唤醒
- mRequests 列表中文件发生 IO 事件
为了方便理解后续程序逻辑,先解释一下相关数据结构的功能:
类名 | 功能 |
---|---|
MessageEnvelope | 信封类,用来描述 native 层的消息,信封包含消息本身,以及消息的回调和就绪时间 |
Request | 文件描述符监控请求,主要域包括:被监控的文件描述符 fd,监控的事件 events,事件回调函数 callback,区分不同请求类型的 ident,保证 Request 唯一性的 seq |
Response | 与 Request 对应,当 Request 对象引用的 fd 发生指定的 IO 事件时,将创建一个 Response 对象引用该 Request 对象,或将触发 Request 对象的 callback 函数被调用 |
变量名 | 功能 |
---|---|
mMessageEnvelopes | 保存 native 消息队列的 Vector 容器 |
mRequestes | 文件描述符监控请求列表,应用程序可以通过 addFd 向列表添加新的请求,加入到列表的 Request 对象同时会加入到 epoll 中监控 |
mResponses | pollOnce 内部使用的变量,与 mRequestes 对应,每次调用 pollInner 会重建,当 mRequestes 列表中的文件发送指定的 IO 事件时,对应的 Requset 将包装成 Response 添加到 mResponses 列表,说明该请求处于待处理状态 |
等待超时
对于第一种情况,说明消息就绪(第一次调用 pollInner 是 java 层消息就绪,后续调用可能是 native 消息或 java 消息就绪),直接跳转到 Done,从 mMessageEnvelopes 取消息,如果有就绪消息,则调用消息回调(MessageHandler) 处理消息,直到所有就绪消息都处理完成,更新mNextMessageUptime。这种情况 mResponses 为空,pollInner 将返回 pollOnce 进行下一次循环,while 依旧不会执行,result 为 POLL_WAKE 或 POLL_CALLBACK,则 for 循环会结束,pollOnce 返回,程序跳转到 java 层处理消息。
mWakeEventFd 唤醒
对于第二种情况,说明有新消息入队,java 层新消息入队,如果 loop 线程处于休眠状态,则会调用 nativeWake 将其唤醒,其 native 实现如下:
[android_os_MessageQueue.cpp -> android_os_MessageQueue_nativeWake]
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();
}
Looper 的 wake 函数会被调用:
[android_os_MessageQueue.cpp -> wake()]
void NativeMessageQueue::wake() {mLooper->wake();
}
如果是 native 层,在发送消息的最后会直接调用 wake。典型的消息发送函数如下:
[Looper.cpp -> sendMessageAtTime]
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,const Message& message) {size_t i = 0;{ // acquire lockAutoMutex _l(mLock);size_t messageCount = mMessageEnvelopes.size();while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {i += 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// Optimization: If the Looper is currently sending a message, then we can skip// the call to wake() because the next thing the Looper will do after processing// messages is to decide when the next wakeup time should be. In fact, it does// not even matter whether this code is running on the Looper thread.if (mSendingMessage) {return;}} // release lock// Wake the poll loop only when we enqueue a new message at the head.if (i == 0) {wake();}
}
发送消息就是:创建一个信封对象 messageEnvelope,投放到 mMessageEnvelopes 容器中。如果 loop 进程处于休眠状态,则调用 wake 函数。
那么 wake 函数是怎么唤醒 loop 线程的呢?
[Looper.cpp -> wake()]
void Looper::wake() {uint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",mWakeEventFd, strerror(errno));}}
}
其实就是往 mWakeEventFd 写入 1,触发 epoll_wait 返回。
这种情况先通过 awoken(); 清除唤醒标志,如果入队的消息是即时消息,处理流程和第一种情况一样,如果是时延时消息,则只会更新下一次 epoll_wait 的超时时间。
mRequests 列表中文件发生 IO 事件
对于第三种情况说明 mRequestes 列表中的文件状态发生了改变。这种情况在 native 层使用比较广泛,java 层基本不会使用。类似于消息发送函数,应用的程序可以调用 addFd 向 mRequestes 列表中添加文件描述符监控。
[Looper.cpp -> addFd]
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {if (!callback.get()) {if (! mAllowNonCallbacks) {ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");return -1;}if (ident < 0) {ALOGE("Invalid attempt to set NULL callback with ident < 0.");return -1;}} else {ident = POLL_CALLBACK;}{ // acquire lockAutoMutex _l(mLock);Request request;request.fd = fd;request.ident = ident;request.events = events;request.seq = mNextRequestSeq++;request.callback = callback;request.data = data;if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1struct epoll_event eventItem;request.initEventItem(&eventItem);ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));return -1;}mRequests.add(fd, request);} else {int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);if (epollResult < 0) {if (errno == ENOENT) {epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);if (epollResult < 0) {ALOGE("Error modifying or adding epoll events for fd %d: %s",fd, strerror(errno));return -1;}scheduleEpollRebuildLocked();} else {ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));return -1;}}mRequests.replaceValueAt(requestIndex, request);}} // release lockreturn 1;
}
Request 通过 ident 区分不同的请求类型:
如果 ident 大于等于 0,则表示该 Request 是不需要回调函数的,当然这种情况需要在 Looper 初始化时设置 mAllowNonCallbacks 为 true;
如果 ident 等于 POLL_CALLBACK,表示 Repuest 引用的文件状态改变,会调用其回调函数。
epoll 监测到 mRequestes 列表中的文件状态发生改变后,将所有状态改变的 Request 包装成 Response push 到 mResponses,可以自行分析 pushResponse 源码。此时再需要处理处理消息(native 消息和 java 消息),而是逐个回调 ident 为 POLL_CALLBACK 的 Request 的回调函数。pollInner 将返回 pollOnce 进行下一次循环,此时 mResponses 不为空,while 中的代码将执行,这里将找到处理 ident > 0 的 Request,其实就是返回 ident,应用程序根据 pollOnce 的返回值可以判断是哪一个 Request 触发,从而进行相应处理。
注意这种情况,下一次调用 pollOnce 会继续处理 mResponses 中的请求,直到所有 ident > 0 的请求处理完,才会调用下一次 pollInner。
java 层也是可以添加文件描述符监控请求,接口函数是 addOnFileDescriptorEventListener,具体的 jni 调用就不再赘述。
发送一个 Native 消息
理论加实战才是掌握一个技术最好方法。下面以一个简单的例子来演示 native 层如何发送消息。
创建消息回调类:
class MyHandler : public MessageHandler {public:enum {MSG1 = 0,MSG2 = 1,MSG3 = 2,};public:virtual void handleMessage(const Message& message);
};void MyHandler::handleMessage(const Message& message) {switch (message.what) {case MyHandler::MSG1:ALOGD("handle MSG1");break;case MyHandler::MSG2:ALOGD("handle MSG2");break;case MyHandler::MSG3:ALOGD("handle MSG3");pthread_exit(NULL);break;}
}
创建线程类
class MyThread : public Thread
{public:explicit MyThread(sp<Looper> looper): mLooper(looper){mHandler = new MyHandler();}protected:virtual bool threadLoop(){ALOGD("threadLoop");Message message1;message1.what = MyHandler::MSG1;ALOGD("send MSG1");mLooper->sendMessageDelayed(4 * DEFAULT_BACKLOG_DELAY_NS, mHandler, message1);Message message2;message2.what = MyHandler::MSG2;ALOGD("send MSG2");mLooper->sendMessageDelayed(2 * DEFAULT_BACKLOG_DELAY_NS, mHandler, message2);Message message3;message3.what = MyHandler::MSG3;ALOGD("send MSG3");mLooper->sendMessageDelayed(4 * DEFAULT_BACKLOG_DELAY_NS, mHandler, message3);return false;}private:sp<Looper> mLooper;sp<MyHandler> mHandler;
};
编写 main 函数,在主线程中实现轮询:
int main(int argc, char *argv[]){(void) argc;(void) argv;sp<Looper> mLooper = Looper::prepare(false);sp<MyHandler> mHandler = new MyHandler();sp<Thread> t = new MyThread(mLooper);t->run("thread-1");int events;do {int32_t ret = mLooper->pollOnce(-1);switch (ret) {case Looper::POLL_WAKE:case Looper::POLL_CALLBACK:continue;case Looper::POLL_ERROR:ALOGE("Looper::POLL_ERROR");continue;case Looper::POLL_TIMEOUT:// timeout (should not happen)continue;default:// should not happenALOGE("Looper::pollOnce() returned unknown status %d", ret);continue;}} while (true);
}
运行 log 如下:
01-02 16:44:26.625 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - waiting: timeoutMillis=-1
01-02 16:44:26.625 12888 12889 D message_queue_test: threadLoop
01-02 16:44:26.626 12888 12889 D message_queue_test: send MSG1
01-02 16:44:26.626 12888 12889 D Looper : 0x7d68453060 ~ sendMessageAtTime - uptime=36547968233177, handler=0x7d684272e0, what=0
01-02 16:44:26.626 12888 12889 D Looper : 0x7d68453060 ~ wake
01-02 16:44:26.626 12888 12889 D message_queue_test: send MSG2
01-02 16:44:26.626 12888 12889 D Looper : 0x7d68453060 ~ sendMessageAtTime - uptime=36545968643100, handler=0x7d684272e0, what=1
01-02 16:44:26.626 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - handling events from 1 fds
01-02 16:44:26.626 12888 12888 D Looper : 0x7d68453060 ~ awoken
01-02 16:44:26.626 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - returning result -1
01-02 16:44:26.626 12888 12889 D Looper : 0x7d68453060 ~ wake
01-02 16:44:26.626 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - waiting: timeoutMillis=-1
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - next message in 3999077462ns, adjusted timeout: timeoutMillis=4000
01-02 16:44:26.627 12888 12889 D message_queue_test: send MSG3
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - handling events from 1 fds
01-02 16:44:26.627 12888 12889 D Looper : 0x7d68453060 ~ sendMessageAtTime - uptime=36547969305100, handler=0x7d684272e0, what=2
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ awoken
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - returning result -1
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - waiting: timeoutMillis=-1
01-02 16:44:26.627 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - next message in 1999024077ns, adjusted timeout: timeoutMillis=2000
01-02 16:44:28.630 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - timeout
01-02 16:44:28.630 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - sending message: handler=0x7d684272e0, what=1
01-02 16:44:28.630 12888 12888 D message_queue_test: handle MSG2
01-02 16:44:28.630 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - returning result -2
01-02 16:44:28.630 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - waiting: timeoutMillis=-1
01-02 16:44:28.631 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - next message in 1995191846ns, adjusted timeout: timeoutMillis=1996
01-02 16:44:30.629 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - timeout
01-02 16:44:30.629 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - sending message: handler=0x7d684272e0, what=0
01-02 16:44:30.629 12888 12888 D message_queue_test: handle MSG1
01-02 16:44:30.629 12888 12888 D Looper : 0x7d68453060 ~ pollOnce - sending message: handler=0x7d684272e0, what=2
01-02 16:44:30.629 12888 12888 D message_queue_test: handle MSG3
12889 线程依次发送 MSG1、MSG2、MSG3,主线程 12888 2s 后收到 MSG2, 4s 后收到 MSG1, 然后收到 MSG3,MSG3 触发主线程退出。
添加一个文件描述符监控请求
native 消息队列的另一个应用是监控文件描述符。下面是示例实现两个文件描述符的监控,第一个文件描述符是 stdin,即监控来之键盘的输入;第二文件描述符是 socketpair 的一端,当 socketpair 的另一端将收到数据。
使用 socketpair 实现本地消息的收发,并抽象成类:
class TestChannel : public RefBase {protected:virtual ~TestChannel();public:TestChannel(const String8& name, int fd);static status_t openTestChannelPair(const char* name,sp<TestChannel>& outServerChannel, sp<TestChannel>& outClientChannel);inline String8 getName() const { return mName; }inline int getFd() const { return mFd; }status_t sendMessage(const String8& msg);status_t receiveMessage(String8& msg);private:String8 mName;int mFd;
};TestChannel::TestChannel(const String8& name, int fd) :mName(name), mFd(fd) {ALOGD("Test channel constructed: name='%s', fd=%d",mName.string(), fd);int result = fcntl(mFd, F_SETFL, O_NONBLOCK);LOG_ALWAYS_FATAL_IF(result != 0, "channel '%s' ~ Could not make socket ""non-blocking. errno=%d", mName.string(), errno);
}TestChannel::~TestChannel() {#if DEBUG_CHANNEL_LIFECYCLEALOGD("Test channel destroyed: name='%s', fd=%d",mName.string(), mFd);
#endif::close(mFd);
}status_t TestChannel::openTestChannelPair(const char* name,sp<TestChannel>& outServerChannel, sp<TestChannel>& outClientChannel) {int sockets[2];if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockets)) {status_t result = -errno;ALOGE("channel '%s' ~ Could not create socket pair. errno=%d",name, errno);outServerChannel.clear();outClientChannel.clear();return result;}int bufferSize = SOCKET_BUFFER_SIZE;setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));String8 serverChannelName;serverChannelName.setTo(name);serverChannelName.append(" (server)");outServerChannel = new TestChannel(serverChannelName, sockets[0]);String8 clientChannelName;clientChannelName.setTo(name);clientChannelName.append(" (client)");outClientChannel = new TestChannel(clientChannelName, sockets[1]);return OK;
}status_t TestChannel::sendMessage(const String8& msg) {ssize_t nWrite;do {int error = errno;nWrite = ::send(mFd, msg.c_str(), msg.length(), MSG_DONTWAIT | MSG_NOSIGNAL);} while (nWrite == -1 && errno == EINTR);if (nWrite < 0) {int error = errno;if (error == EAGAIN || error == EWOULDBLOCK) {return WOULD_BLOCK;}if (error == EPIPE || error == ENOTCONN || error == ECONNREFUSED || error == ECONNRESET) {return DEAD_OBJECT;}return -error;}if (size_t(nWrite) != msg.length()) {return DEAD_OBJECT;}ALOGD("channel '%s' ~ sent message: %s", mName.string(), msg.string());return OK;
}status_t TestChannel::receiveMessage(String8& msg) {ssize_t nRead;char buf[1024];do {nRead = ::recv(mFd, buf, 1024, MSG_DONTWAIT);} while (nRead == -1 && errno == EINTR);if (nRead < 0) {int error = errno;if (error == EAGAIN || error == EWOULDBLOCK) {return WOULD_BLOCK;}if (error == EPIPE || error == ENOTCONN || error == ECONNREFUSED) {return DEAD_OBJECT;}return -error;}if (nRead == 0) { // check for EOFreturn DEAD_OBJECT;}msg.setTo(buf, nRead);ALOGD("channel '%s' ~ received message: %s", mName.string(), msg.string());return OK;
}
创建事件回调类:
class MyLooperCallback : public LooperCallback {public:virtual int handleEvent(int fd, int events, void* data);
};int MyLooperCallback::handleEvent(int fd, int looperEvents, void* data) {char buf[256];memset(buf, 0, sizeof(buf));if (looperEvents & Looper::EVENT_INPUT) {if(fd == 0) {int events = reinterpret_cast<intptr_t>(data);int i = read(fd, buf, sizeof(buf));ALOGD("handleEvent: %d", events);ALOGD("read(%d): %s", i, buf);} else {sp<TestChannel> *serverChannel = reinterpret_cast<sp<TestChannel>*>(data);String8 msg;(*serverChannel)->receiveMessage(msg);ALOGD("receiveMessage(%d): %s", (int)msg.length(), msg.c_str());}}return 1;
}
main 函数中监控 stdin 和 socketpair 的接收端,在主线程中实现轮询。使用 socketpair 发送端发送数据,检验事件是否触发。
int main(int argc, char *argv[]){(void) argc;(void) argv;sp<Looper> mLooper = Looper::prepare(false);sp<MyLooperCallback> myCallback = new MyLooperCallback;sp<TestChannel> serverChannel;sp<TestChannel> clientChannel;TestChannel::openTestChannelPair("Test", serverChannel, clientChannel);int events = 128;ALOGD("addFd: 0");mLooper->addFd(0, Looper::POLL_CALLBACK, Looper::EVENT_INPUT, myCallback,reinterpret_cast<void*>(events));ALOGD("addFd: %d", serverChannel->getFd());mLooper->addFd(serverChannel->getFd(), Looper::POLL_CALLBACK, Looper::EVENT_INPUT, myCallback,reinterpret_cast<void*>(&serverChannel));String8 msg;msg.setTo("Hello World!");clientChannel->sendMessage(msg);do {int32_t ret = mLooper->pollOnce(-1);switch (ret) {case Looper::POLL_WAKE:case Looper::POLL_CALLBACK:continue;case Looper::POLL_ERROR:ALOGE("Looper::POLL_ERROR");continue;case Looper::POLL_TIMEOUT:// timeout (should not happen)continue;default:// should not happenALOGE("Looper::pollOnce() returned unknown status %d", ret);continue;}} while (true);}
01-05 10:11:27.401 10896 10896 D message_queue_test: Test channel constructed: name=‘Test (server)’, fd=5
01-05 10:11:27.401 10896 10896 D message_queue_test: Test channel constructed: name=‘Test (client)’, fd=6
01-05 10:11:27.401 10896 10896 D message_queue_test: addFd: 0
01-05 10:11:27.401 10896 10896 D Looper : 0x70c9853060 ~ addFd - fd=0, ident=-2, events=0x1, callback=0x70c98270e0, data=0x80
01-05 10:11:27.401 10896 10896 D message_queue_test: addFd: 5
01-05 10:11:27.401 10896 10896 D Looper : 0x70c9853060 ~ addFd - fd=5, ident=-2, events=0x1, callback=0x70c98270e0, data=0x7fc6d79a48
01-05 10:11:27.402 10896 10896 D message_queue_test: channel ‘Test (client)’ ~ sent message: Hello World!
01-05 10:11:27.402 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - waiting: timeoutMillis=-1
01-05 10:11:27.402 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - handling events from 1 fds
01-05 10:11:27.402 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - invoking fd event callback 0x70c98270e0: fd=5, events=0x1, data=0x7fc6d79a48
01-05 10:11:27.402 10896 10896 D message_queue_test: channel ‘Test (server)’ ~ received message: Hello World!
01-05 10:11:27.402 10896 10896 D message_queue_test: receiveMessage(12): Hello World!
01-05 10:11:27.402 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - returning result -2
01-05 10:11:27.402 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - waiting: timeoutMillis=-1
01-05 10:11:31.108 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - handling events from 1 fds
01-05 10:11:31.108 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - invoking fd event callback 0x70c98270e0: fd=0, events=0x1, data=0x80
01-05 10:11:31.108 10896 10896 D message_queue_test: handleEvent: 128
01-05 10:11:31.108 10896 10896 D message_queue_test: read(5): 1234
01-05 10:11:31.108 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - returning result -2
01-05 10:11:31.109 10896 10896 D Looper : 0x70c9853060 ~ pollOnce - waiting: timeoutMillis=-1
总结:
- MessageQueue 分为 java 层和 native 层两部分实现。java 部分负责管理 java 消息队列,处理 IdleHandler。native 部分负责 native 消息管理 java 消息队列,监控文件描述符。它们通过 nativePollOnce 关联。
- java 层消息和 native 层消息是相互独立的两个系统。两个系统同时依赖 epoll 驱动。
- NativeMessageQueue 只实现了 jni 接口,Looper 类实现 native 消息队列的所有功能。
- Native 没有类似 java 层 Handler 类。发送消息通过 Looper 的 sendMessageAtTime 等函数完成,处理消息通过 MessageHandler 完成。
Android MessageQueue 底层实现(C++)相关推荐
- fiyme android底层,魅族首批Android 10底层Flyme于今日正式推送
原标题:魅族首批 Android 10 内测版今日起开始推送 7月21日消息 魅族官方曾于 6 月 25 日宣布为旗下 10 款机型的 Flyme 适配 Android 10 系统底层.从 6 月 2 ...
- android底层库包含哪些,Android的底层库libutils
第一部分 libutils 概述 libutils 是 Android 的底层库,这个库以 C++ 实现,它提供的 API 也是 C++ 的. Android 的层次的 C 语言 程序和库,大都基于 ...
- Android FrameWork 底层开发
1. FrameWork 底层开发 Android: 底层开发 开发核心配置 软件工程师学习与Android 底层与硬件层次的开发,主要学习Android 的OS开发使用, Android底层固化的 ...
- Android FrameWork底层开发视频全套
我的百度云连接 链接: https://pan.baidu.com/s/1i64xvjJ 密码: 5ptb 我的百度云连接 链接: https://pan.baidu.com/s/1i64xvjJ 密 ...
- android嵌入式底层开发教程
android嵌入式底层开发教程 课程针对人群 熟悉.NET,J2EE应用开发,希望往嵌入式底层学习的工程师 熟悉Android应用和框架开发,希望从上到下走通Android系统的工程师 不希望局限在 ...
- android FrameWork底层开发视频
android FrameWork底层开发视频,可配合 <深入理解android.pdf>(这个资源 很多),<Android-Framework框架分析> 学习 https: ...
- Android FrameWork底层开发视频
=========================== 链接: https://pan.baidu.com/s/1iZ1SXS4Oy8drjaQOJ2kR4g 资源提取码获取方式,关注下面微信公众号, ...
- 【 karle 专栏 】Android 初探底层知识系列
这一系列底层知识基于Android 6.0.1版本. 概述 在我还是菜鸟的时候,有很多技术都不明白,也找不到答案,比如,apk是如何安装的?资源是怎么加载的?再比如,AIDL,只听未用过.四大组件也是 ...
- Google宣布支持Vulkan作为Android的底层图形API
很多人在刚听到底层图形API的时候似乎都不怎么激动,但是苹果已经在去年的iOS 8上推出了Metal,并计划于今秋将之推送至Mac OS X EI Capitan.此外,刚刚发布的Windows 10 ...
最新文章
- dx9 lock unlock效率太低_巧用“动作经济原则”,员工不累,效率加倍!
- Java基础:IO流之File类
- linux下的geany源码安装,linux 下php开发工具geany-0.16的安装。
- 【讨论贴】关于父实子虚的疑问???
- EduCoder Linux文件/目录高级管理三
- sql server 2008 身份验证失败 18456
- 前端学习(1568):封装一个面包屑导航
- guid mysql_关于MySQL:MySQL-如何搜索GUID
- 实战课堂:一则CPU 100%的故障分析处理知识和警示
- mysql 编码utfmb4
- Atitit.eclipse git使用
- 中国金属复合开关设备市场趋势报告、技术动态创新及市场预测
- 代码高亮插件——wangHightLighter.js——demo演示
- mysql lower case_mysql lower-case-table-names参数
- 计算机专业个人职业规划范文200字,计算机专业的职业生涯规划范文
- 视频分辨率QCIF、CIF、2CIF、4CIF,D1~D5
- Python实现基于负熵最大判据的FastICA胎心信号分离
- 假如让你从 0 到 1 实现一个直播弹幕系统
- mysql忘记密码win10_win10 mysql8.0.12 忘记root密码如何重置密码
- 智慧城市在物联网技术与行业应用
热门文章
- python语言编写的DLL注入工具
- 线性插值——如何根据两点确定一条线段
- 【Unity3D】关于 InputManager 以及改键功能的制作
- hilbert谱 matlab,MATLAB实现EMD分解及希尔伯特谱分析
- transforms_list = [c_vision.Decode(), c_vision.Rescale(1.0 / 127.5, -1.0)]这个是图像标准化【-1,1】是吗?
- iMeta | 华科宁康等打通中药成分鉴定和网络药理分析,助力中药标准化数字化国际化...
- python量化选股策略 源码_常见的十大量化投资策略(附源码)
- 牛逼!用 AI 实现 C++、Java、Python 代码互译!
- 一台服务器上如何创建多个网站
- 私服服务器显示关闭怎么办,《魔兽世界》私服关闭