This page looks best with JavaScript enabled

Android 中 MessageQueue 的 nativePollOnce

 ·  ☕ 7 min read

Android SDK 中的事件循环已经是一个老生常谈的问题了, 像 Handler Looper MessageQueue 这几个类也是被大家研究透彻了.
但是再回头看以前自己的分析, 总感觉差点什么, 不够透彻. 心里隐隐感觉自己没有把事情完全吃透, 于是今日又回顾 Android 中的事件循环机制, 注意到
MessageQueue 中获取下一条消息时会执行一个 native 调用 nativePollOnce, 翻看 Android 系统源码发现有内容.

来自高手的解释

首先, 先梭哈一把 stackoverflow 上高手对这个问题(android - what is message queue native poll once in android?)的回答原文:

Short answer:

The nativePollOnce method is used to “wait” till the next Message becomes available. If the time spent during this call is long, your main (UI) thread has no real work to do and waits for next events to process. There’s no need to worry about that.

Explanation:

Because the “main” thread is responsible for drawing UI and handling various events, it’s Runnable has a loop which processes all these events. The loop is managed by a Looper and its job is quite straightforward: it processes all Messages in the MessageQueue.

A Message is added to the queue for example in response to input events, as frame rendering callback or even your own Handler.post calls. Sometimes the main thread has no work to do (that is, no messages in the queue), which may happen e.g. just after finishing rendering single frame (the thread has just drawn one frame and is ready for the next one, just waits for a proper time). Two Java methods in the MessageQueue class are interesting to us: Message next() and boolean enqueueMessage(Message, long). Message next(), as its name suggest, takes and returns the next Message from the queue. If the queue is empty (and there’s nothing to return), the method calls native void nativePollOnce(long, int) which blocks until a new message is added. At this point you might ask how does nativePollOnce know when to wake up. That’s a very good question. When a Message is added to the queue, the framework calls the enqueueMessage method, which not only inserts the message into the queue, but also calls native static void nativeWake(long), if there’s need to wake up the queue. The core magic of nativePollOnce and nativeWake happens in the native (actually, C++) code. Native MessageQueue utilizes a Linux system call named epoll, which allows to monitor a file descriptor for IO events. nativePollOnce calls epoll_wait on a certain file descriptor, whereas nativeWake writes to the descriptor, which is one of the IO operations, epoll_wait waits for. The kernel then takes out the epoll-waiting thread from the waiting state and the thread proceeds with handling the new message. If you’re familiar with Java’s Object.wait() and Object.notify() methods, you can imagine that nativePollOnce is a rough equivalent for Object.wait() and nativeWake for Object.notify(), except they’re implemented completely differently: nativePollOnce uses epoll and Object.wait() uses futex Linux call. It’s worth noticing that neither nativePollOnce nor Object.wait() waste CPU cycles, as when a thread enters either method, it becomes disabled for thread scheduling purposes (quoting the javadoc for the Object class). However, some profilers may mistakenly recognize epoll-waiting (or even Object-waiting) threads as running and consuming CPU time, which is incorrect. If those methods actually wasted CPU cycles, all idle apps would use 100% of the CPU, heating and slowing down the device.

Conclusion:

You shouldn’t worry about nativePollOnce. It just indicates that processing of all Messages has been finished and the thread waits for the next one. Well, that simply means you don’t give too much work to your main thread ;)


translate.google 一下 😬:

简短答案:

nativePollOnce 方法用于“等待”, 直到下一条消息可用为止. 如果在此调用期间花费的时间很长, 则您的主线程没有实际工作要做, 而是等待下一个事件处理.无需担心.

说明:

因为主线程负责绘制 UI 和处理各种事件, 所以主线程拥有一个处理所有这些事件的循环. 该循环由 L​​ooper 管理, 其工作非常简单: 它处理 MessageQueue 中的所有 Message.
例如, 响应于输入事件, 将消息添加到队列, 帧渲染回调, 甚至您的 Handler.post 调用. 有时主线程无事可做(即队列中没有消息), 例如在完成渲染单帧之后(线程刚绘制了一帧, 并准备好下一帧, 等待适当的时间). MessageQueue 类中的两个 Java 方法对我们很有趣: Message next()boolean enqueueMessage(Message, long). 顾名思义, Message next() 从队列中获取并返回下一个消息. 如果队列为空(无返回值), 则该方法将调用 native void nativePollOnce(long, int), 该方法将一直阻塞直到添加新消息为止. 此时,您可能会问nativePollOnce 如何知道何时醒来. 这是一个很好的问题. 当将 Message 添加到队列时, 框架调用 enqueueMessage 方法, 该方法不仅将消息插入队列, 而且还会调用native static void nativeWake(long). nativePollOncenativeWake 的核心魔术发生在 native 代码中. native MessageQueue 利用名为 epoll 的 Linux 系统调用, 该系统调用可以监视文件描述符中的 IO 事件. nativePollOnce 在某个文件描述符上调用 epoll_wait, 而 nativeWake 写入一个 IO 操作到描述符, epoll_wait 等待. 然后, 内核从等待状态中取出 epoll 等待线程, 并且该线程继续处理新消息. 如果您熟悉 Java 的 Object.wait()Object.notify()方法,可以想象一下 nativePollOnce 大致等同于 Object.wait(), nativeWake 等同于 Object.notify(),但它们的实现完全不同: nativePollOnce 使用 epoll, 而 Object.wait 使用 futex Linux 调用. 值得注意的是, nativePollOnceObject.wait 都不会浪费 CPU 周期, 因为当线程进入任一方法时, 出于线程调度的目的, 该线程将被禁用(引用Object类的javadoc). 但是, 某些事件探查器可能会错误地将等待 epoll 等待(甚至是 Object.wait)的线程识别为正在运行并消耗 CPU 时间, 这是不正确的. 如果这些方法实际上浪费了 CPU 周期, 则所有空闲的应用程序都将使用 100% 的 CPU, 从而加热并降低设备速度.

结论:

nativePollOnce. 它只是表明所有消息的处理已完成, 线程正在等待下一个消息.

我以前的理解分享

Looper

linux io model

Linux 有多个 IO 模型:

  • 阻塞 IO
  • 非阻塞 IO
  • IO 复用, 对应 select poll epoll 都属于基于 IO 复用模式的调用
  • 信号驱动 IO
  • 异步 IO

io multiplexing

看下源码

Java 这边

enqueueMessage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }
    synchronized (this) {
        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }
        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            // 这里唤醒 nativePollOnce 的沉睡
            nativeWake(mPtr);
        }
    }
    return true;
}

next:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Message next() {
    //...
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        // nativePollOnce 这里陷入沉睡, 等待唤醒
        nativePollOnce(ptr, nextPollTimeoutMillis);
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
            //...
        }
        //...
    }
}

CPP 那边

nativeWake

1
2
3
void NativeMessageQueue::wake() {
    mLooper->wake();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void Looper::wake() {
    uint64_t inc = 1;
    // 调用 write 向 mWakeEventFd 描述符写入一个事件
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}

nativePollOnce:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
int Looper::pollInner(int timeoutMillis) {
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 这里重点
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
Support the author with
alipay QR Code
wechat QR Code

Yang
WRITTEN BY
Yang
Developer