zoukankan      html  css  js  c++  java
  • Android消息机制不完全解析(下)

        接着上一篇文章Android消息机制不完全解析(上),接着看C++部分的实现。


        首先,看看在/frameworks/base/core/jni/android_os_MessageQueue.cpp文件中看看android.os.MessageQueue类中的四个原生函数的实现:

    static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//构造NativeMessageQueue实例
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return;
        }
        nativeMessageQueue->incStrong(env);//强引用+1
        android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
    }
    
    static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jobject obj) {
        NativeMessageQueue* nativeMessageQueue =
                android_os_MessageQueue_getNativeMessageQueue(env, obj);
        if (nativeMessageQueue) {
            android_os_MessageQueue_setNativeMessageQueue(env, obj, NULL);
            nativeMessageQueue->decStrong(env);//强引用-1,实际上会导致释放NativeMessageQueue实例
        }
    }
    
    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
            jint ptr, jint timeoutMillis) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);//指针强制转换
        nativeMessageQueue->pollOnce(env, timeoutMillis);//调用nativeMessageQueue的pollonce函数
    }
    
    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        return nativeMessageQueue->wake();//调用nativeMessageQueue的wake函数
    }


        从代码中,可以看到这四个函数的实现都是依赖于NativeMessageQueue类。不过,在开始解析NativeMessageQueue之前,我们再看一些有意思的代码:

    static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
            NativeMessageQueue* nativeMessageQueue) {
        env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
                 reinterpret_cast<jint>(nativeMessageQueue));//把nativeMessageQueue的实例地址强转为java的int类型并保存到gMessageQueueClassInfo.mPtr中
    }

        那么gMessageQueueClassInfo.mPtr是什么呢?

    static JNINativeMethod gMessageQueueMethods[] = {
        /* name, signature, funcPtr */
        { "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
        { "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
        { "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
        { "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
    };
    
    #define FIND_CLASS(var, className) 
            var = env->FindClass(className); 
            LOG_FATAL_IF(! var, "Unable to find class " className);
    
    #define GET_FIELD_ID(var, clazz, fieldName, fieldDescriptor) 
            var = env->GetFieldID(clazz, fieldName, fieldDescriptor); 
            LOG_FATAL_IF(! var, "Unable to find field " fieldName);
    //这个函数在Android启动的时候,会被系统调用
    int register_android_os_MessageQueue(JNIEnv* env) {
        int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
                gMessageQueueMethods, NELEM(gMessageQueueMethods));//关联MessageQueueQueue的原生函数
        LOG_FATAL_IF(res < 0, "Unable to register native methods.");
    
        jclass clazz;
        FIND_CLASS(clazz, "android/os/MessageQueue");//获取MessageQueue的class
    
        GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,
                "mPtr", "I");//获取MessageQueue class的mPtr field的Id
        
        return 0;
    }

        上面的代码很像java的反射有木有?

        Class cls = Class.forName("android.os.MessageQueue");
        Field feild = cls.getField("mPtr");

        到这里,我们就明白了android_os_MessageQueue_setNativeMessageQueue函数实际上把android.os.MessageQueue实例的mPtr值设置为nativeMessageQueue实例的地址。虽然Java语言没有指针的说法,但是,这里的mPtr却的的确确是作为一个指针使用的。现在,我们也就理解了,为什么mPtr可以被强制转换为nativeMessageQueue了。

        小结:

    1. android_os_MessageQueue_nativeInit和android_os_MessageQueue_nativeDestory两个函数做了些什么:
      • android_os_MessageQueue_nativeInit:构造NativeMessageQueue实例
      • android_os_MessageQueue_nativeDestory:销毁NativeMessageQeue实例

    NativeMessageQueue


        先来看看NativeMessageQueue的声明:
    class NativeMessageQueue : public MessageQueue {
    public:
        NativeMessageQueue();
        virtual ~NativeMessageQueue();
    
        virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
    
        void pollOnce(JNIEnv* env, int timeoutMillis);
    
        void wake();
    
    private:
        bool mInCallback;
        jthrowable mExceptionObj;
    };

        NativeMessageQueue继承自MessageQueue(不是java中的android.os.MessageQueue哦),关于MessageQueue我们只需要了解,它包含了一个成员mLooper即可(有兴趣的同学可以查看/frameworks/base/core/jni/android_os_MessageQueue.h)    

    class MessageQueue  {
    
        ......
    
    protected:
        sp<Looper> mLooper;
    };

        继续看NativeMessageQueue的代码:

    NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();
        if (mLooper == NULL) {
            mLooper = new Looper(false);//实例化mLooper
            Looper::setForThread(mLooper);
        }
    }

        NativeMessageQueue构造实例的时候,会实例化mLooper。

    void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
        mInCallback = true;
        mLooper->pollOnce(timeoutMillis);
        mInCallback = false;
        if (mExceptionObj) {
            env->Throw(mExceptionObj);
            env->DeleteLocalRef(mExceptionObj);
            mExceptionObj = NULL;
        }
    }
    
    void NativeMessageQueue::wake() {
        mLooper->wake();
    }

        小结:

    1. NativeMessageQueue的函数pollonce和wake实现相当简单,交给mLooper的同名函数。

    Looper


        先来看看Looper的声明/frameworks/native/include/utils/Looper.h:

    class Looper : public ALooper, public RefBase {
    protected:
        virtual ~Looper();
    
    public:
        Looper(bool allowNonCallbacks);
    
        bool getAllowNonCallbacks() const;
    
        int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
        inline int pollOnce(int timeoutMillis) {
            return pollOnce(timeoutMillis, NULL, NULL, NULL);
        }
    
        void wake();
    
    private:
    
        const bool mAllowNonCallbacks; // immutable
    
        int mWakeReadPipeFd;  // immutable
        int mWakeWritePipeFd; // immutable
        Mutex mLock;
    
        int mEpollFd; // immutable
    
        int pollInner(int timeoutMillis);
        void awoken();
    };

        因为代码有点多,所以上面的声明,已经被我精简了大部分,现在我们只关注我们关心的:pollonce和wake函数。

        还是从构造函数开始(frameworks/native/utils/Looper.cpp):

    Looper::Looper(bool allowNonCallbacks) :
            mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
            mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
        int wakeFds[2];
        int result = pipe(wakeFds);//创建命名管道
        LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
        // 保存命名管道
        mWakeReadPipeFd = wakeFds[0];
        mWakeWritePipeFd = wakeFds[1];
    
        result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
        LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
                errno);
    
        result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
        LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
                errno);
        // 开始使用epoll API,实现轮询
        // Allocate the epoll instance and register the wake pipe.
        mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epoll文件描述符
        LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
    
        struct epoll_event eventItem;
        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
        eventItem.events = EPOLLIN;
        eventItem.data.fd = mWakeReadPipeFd;
        result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 把刚才创建的命名管道的读端加入的到epoll的监听队列中
        LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
                errno);
    }
    
    Looper::~Looper() {
        close(mWakeReadPipeFd);//释放命名管道
        close(mWakeWritePipeFd);
        close(mEpollFd);//释放epoll文件描述符
    }

        Looper的构造函数中,出现了命名管道和epoll相关的代码,这是为什么呢?别急,接着看下去就知道了:

    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
        int result = 0;
        for (;;) {
           // 这段代码暂时无视
           while (mResponseIndex < mResponses.size()) {
                const Response& response = mResponses.itemAt(mResponseIndex++);
                int ident = response.request.ident;
                if (ident >= 0) {//ident > 0, 即此response为noncallback,需要返回event,data等数据给调用者处理
                    int fd = response.request.fd;
                    int events = response.events;
                    void* data = response.request.data;
    #if DEBUG_POLL_AND_WAKE
                    ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                            "fd=%d, events=0x%x, data=%p",
                            this, ident, fd, events, data);
    #endif
                    if (outFd != NULL) *outFd = fd;
                    if (outEvents != NULL) *outEvents = events;
                    if (outData != NULL) *outData = data;
                    return ident;
                }
            }
    
            if (result != 0) {
    #if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning result %d", this, result);
    #endif
                if (outFd != NULL) *outFd = 0;
                if (outEvents != NULL) *outEvents = 0;
                if (outData != NULL) *outData = NULL;
                return result;
            }
    
            result = pollInner(timeoutMillis);//这一行才是重点!
        }
    }

        接着往下看,代码有些长,但请仔细看:

    int Looper::pollInner(int timeoutMillis) {
        
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
    #endif
    
        // 设置timeoutMillis的值为Math.min(timeoutMills, mNextMessageUptime)
        // Adjust the timeout based on when the next message is due.
        if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
            if (messageTimeoutMillis >= 0
                    && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
                timeoutMillis = messageTimeoutMillis;
            }
    #if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                    this, mNextMessageUptime - now, timeoutMillis);
    #endif
        }
    
    
        // Poll.
        int result = ALOOPER_POLL_WAKE;
        mResponses.clear();
        mResponseIndex = 0;
    
        // 开始轮询
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    
        // Acquire lock.
        mLock.lock();
    
        // Check for poll error.
        if (eventCount < 0) { //处理error
            if (errno == EINTR) {
                goto Done;
            }
            ALOGW("Poll failed with an unexpected error, errno=%d", errno);
            result = ALOOPER_POLL_ERROR;
            goto Done;
        }
    
        // Check for poll timeout.
        if (eventCount == 0) { // 未能等到event,故timeout
    #if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - timeout", this);
    #endif
            result = ALOOPER_POLL_TIMEOUT;
            goto Done;
        }
    
        // Handle all events.
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
    #endif
    
        for (int i = 0; i < eventCount; i++) {//有event,则处理
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if (fd == mWakeReadPipeFd) {
                if (epollEvents & EPOLLIN) {
                    awoken();//读取命名管道内的数据
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
                }
            } else {
                ssize_t requestIndex = mRequests.indexOfKey(fd);
                if (requestIndex >= 0) {
                    int events = 0;
                    if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                    if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                    if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                    if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                    pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到mResponses中,等待后续处理
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                            "no longer registered.", epollEvents, fd);
                }
            }
        }
    Done: ;
        // 处理C++层的Message
        // Invoke pending message callbacks.
        mNextMessageUptime = LLONG_MAX;
        while (mMessageEnvelopes.size() != 0) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
            if (messageEnvelope.uptime <= now) {
                // Remove the envelope from the list.
                // We keep a strong reference to the handler until the call to handleMessage
                // finishes.  Then we drop it so that the handler can be deleted *before*
                // we reacquire our lock.
                { // obtain handler
                    sp<MessageHandler> handler = messageEnvelope.handler;
                    Message message = messageEnvelope.message;
                    mMessageEnvelopes.removeAt(0);
                    mSendingMessage = true;
                    mLock.unlock();
    
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                    ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                            this, handler.get(), message.what);
    #endif
                    handler->handleMessage(message);//调用handler->handleMessage
                } // release handler
    
                mLock.lock();
                mSendingMessage = false;
                result = ALOOPER_POLL_CALLBACK;
            } else {
                // The last message left at the head of the queue determines the next wakeup time.
                mNextMessageUptime = messageEnvelope.uptime;//更新mNextMessageUptime
                break;
            }
        }
    
        // Release lock.
        mLock.unlock();
        // 处理mResponses
        // Invoke all response callbacks.
        for (size_t i = 0; i < mResponses.size(); i++) {
            Response& response = mResponses.editItemAt(i);
            if (response.request.ident == ALOOPER_POLL_CALLBACK) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                        this, response.request.callback.get(), fd, events, data);
    #endif
                int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
                if (callbackResult == 0) {
                    removeFd(fd);
                }
                // Clear the callback reference in the response structure promptly because we
                // will not clear the response vector itself until the next poll.
                response.request.callback.clear();
                result = ALOOPER_POLL_CALLBACK;
            }
        }
        return result;
    }

        代码比较长,所以分段分析:

        // 设置timeoutMillis的值为Math.min(timeoutMillis, mNextMessageUptime)
        // Adjust the timeout based on when the next message is due.
        if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
            if (messageTimeoutMillis >= 0
                    && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
                timeoutMillis = messageTimeoutMillis;
            }#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                    this, mNextMessageUptime - now, timeoutMillis);
    #endif
        }
    

        为了解析这段代码,需要先补充一些C++层的Message相关的代码:

    struct Message {
        Message() : what(0) { }
        Message(int what) : what(what) { }
    
        /* The message type. (interpretation is left up to the handler) */
        int what;
    };
    
    
    class MessageHandler : public virtual RefBase {
        protected:
            virtual ~MessageHandler() { }
    
        public:
            /**
             * Handles a message.
             */
            virtual void handleMessage(const Message& message) = 0;
    }
    
    struct MessageEnvelope {
        MessageEnvelope() : uptime(0) { }
        MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,const Message& message) : uptime(uptime), handler(handler), message(message) {}
    
        nsecs_t uptime;
        MessageHandler> handler;
        Message message;
    };
    
    void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        sendMessageAtTime(now, handler, message);
    }
    
    void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
            const Message& message) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        sendMessageAtTime(now + uptimeDelay, handler, message);
    }
    
    void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
            const Message& message) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
                this, uptime, handler.get(), message.what);
    #endif
    
        size_t i = 0;
        { // acquire lock
            AutoMutex _l(mLock);
    
            size_t messageCount = mMessageEnvelopes.size();
            while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
                i += 1;
            }
    
            MessageEnvelope messageEnvelope(uptime, handler, message);
            mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
    
            // Optimization: If the Looper is currently sending a message, then we can skip
            // the call to wake() because the next thing the Looper will do after processing
            // messages is to decide when the next wakeup time should be.  In fact, it does
            // not even matter whether this code is running on the Looper thread.
            if (mSendingMessage) {
                return;
            }
        } // release lock
    
        // Wake the poll loop only when we enqueue a new message at the head.
        if (i == 0) {
            wake();
        }
    }
    
    void Looper::removeMessages(const sp<MessageHandler>& handler) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
    #endif
    
        { // acquire lock
            AutoMutex _l(mLock);
    
            for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
                const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
                if (messageEnvelope.handler == handler) {
                    mMessageEnvelopes.removeAt(i);
                }
            }
        } // release lock
    }
    
    void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
    #endif
    
        { // acquire lock
            AutoMutex _l(mLock);
    
            for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
                const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
                if (messageEnvelope.handler == handler
                        && messageEnvelope.message.what == what) {
                    mMessageEnvelopes.removeAt(i);
                }
            }
        } // release lock
    }

        是不是觉得上面的代码似曾相识?和Java层的MessageQueue很像似有木有?和java层一样,C++层存在消息队列和消息处理机制,消息被保存到成员mMessageEvelopes中,并在pollInner函数中处理消息(调用MesageHandler的handleMesage函数处理)。

        现在回到Looper::pollonceh函数,我们就应该能够理解,pollOnce函数到timeOutMillis参数仅仅代表了Java层下一个Message的触发延迟,所以,我们还需要考虑C++层下一个Message的触发延迟,所以,代码设置timeoutMillis为timeoutMillis和mNextMessageUpTime中的较小值。


        继续下一段代码:

        int result = ALOOPER_POLL_WAKE;
        mResponses.clear();
        mResponseIndex = 0;
        // 开始轮询
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    
        // Acquire lock.
        mLock.lock();

        调用epoll函数,等待event发生,epoll_wait函数的返回值有三种可能:失败出错、没有event、有一个或多个event。

    1. 失败的处理:

        // Check for poll error.
        if (eventCount < 0) { //处理error
            if (errno == EINTR) {
                goto Done;
            }
            ALOGW("Poll failed with an unexpected error, errno=%d", errno);
            result = ALOOPER_POLL_ERROR;
            goto Done;
        }

    2. 没有event发生:

        // Check for poll timeout.
        if (eventCount == 0) { // 未能等到event,故timeout
    #if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - timeout", this);
    #endif
            result = ALOOPER_POLL_TIMEOUT;
            goto Done;
        }

    3. 有event发生:

        // Handle all events.
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
    #endif
    
        for (int i = 0; i < eventCount; i++) {//有event,则处理
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if (fd == mWakeReadPipeFd) {//说明java层或者C++层有新的Message
                if (epollEvents & EPOLLIN) {
                    awoken();//读取命名管道内的数据
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
                }
            } else {
                ssize_t requestIndex = mRequests.indexOfKey(fd);
                if (requestIndex >= 0) {
                    int events = 0;
                    if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                    if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                    if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                    if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                    pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到mResponses中,等待后续处理
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                            "no longer registered.", epollEvents, fd);
                }
            }
        }

        处理event的时候,需要分为两类:fd==mWakeReadPipeFd和fd!=mWakeReadPipeFd

    fd==mWakeReadPipeFd:说明C++层,或者java层有新的Message出现,需要处理。这种情况下,只需要读mWakeReadPipeFd内的数据即可

    void Looper::awoken() {
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ awoken", this);
    #endif
    
        char buffer[16];
        ssize_t nRead;
        do {
            nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
        } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
    }

    fd!=mWakeReadPipeFd:首先需要搞明白fd是哪来的:

        struct Request {
            int fd;
            int ident;
            sp<LooperCallback> callback;
            void* data;
        };
    
    
        struct Response {
            int events;
            Request request;
        };
    /**
     * A looper callback.
     */
    class LooperCallback : public virtual RefBase {
    protected:
        virtual ~LooperCallback() { }
    
    
    public:
        /**
         * Handles a poll event for the given file descriptor.
         * It is given the file descriptor it is associated with,
         * a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT),
         * and the data pointer that was originally supplied.
         *
         * Implementations should return 1 to continue receiving callbacks, or 0
         * to have this file descriptor and callback unregistered from the looper.
         */
        virtual int handleEvent(int fd, int events, void* data) = 0;
    };
    int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
                events, callback.get(), data);
    #endif
    
        if (!callback.get()) {
            if (! mAllowNonCallbacks) {
                ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
                return -1;
            }
    
            if (ident < 0) {//仅当Looper支持NonCallbacks,并且ident大于0时,允许添加callback为null的Fd
                ALOGE("Invalid attempt to set NULL callback with ident < 0.");
                return -1;
            }
        } else {
            ident = ALOOPER_POLL_CALLBACK;
        }
    
        int epollEvents = 0;
        if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
        if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
    
        { // acquire lock
            AutoMutex _l(mLock);
    
            Request request;
            request.fd = fd;
            request.ident = ident;
            request.callback = callback;
            request.data = data;
    
            struct epoll_event eventItem;
            memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
            eventItem.events = epollEvents;
            eventItem.data.fd = fd;
    
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex < 0) {
                int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
                if (epollResult < 0) {
                    ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
                    return -1;
                }
                mRequests.add(fd, request);
            } else {//存在则替换
                int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
                if (epollResult < 0) {
                    ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
                    return -1;
                }
                mRequests.replaceValueAt(requestIndex, request);
            }
        } // release lock
        return 1;
    }

        从上面的代码,我们可知,Looper还支持添加Fd和自定义的callback,类似java层的Message.callback。 

       通过addFd函数,可以向Looper的mEpollFd添加指定的Fd,当Fd触发指定的event .e.i  EPOLLIN or EPOLLOUT时,指定的相应的自定义callback就会得到执行。

       另外,当Looper支持noncallback时,还可以向Looper添加callback为null的Fd,因为没有callback,所以Fd添加者需要调用int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) 通过outFd、outEvent、outData参数获取数据并进行处理。

        所以当Fd触发消息时,需要生成对应到reponse并添加到meResponses中,等待后续的处理。

    void Looper::pushResponse(int events, const Request& request) {
        Response response;
        response.events = events;
        response.request = request;
        mResponses.push(response);
    }

        到这里为止,epoll_wait函数返回的三种结果的不同处理已经解析完毕,接下来代码进入共同的Done环节。

    处理C++层的Message:

    Done: ;
        // 处理C++层的Message
        // Invoke pending message callbacks.
        mNextMessageUptime = LLONG_MAX;
        while (mMessageEnvelopes.size() != 0) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
            if (messageEnvelope.uptime <= now) {
                // Remove the envelope from the list.
                // We keep a strong reference to the handler until the call to handleMessage
                // finishes.  Then we drop it so that the handler can be deleted *before*
                // we reacquire our lock.
                { // obtain handler
                    sp<MessageHandler> handler = messageEnvelope.handler;
                    Message message = messageEnvelope.message;
                    mMessageEnvelopes.removeAt(0);
                    mSendingMessage = true;
                    mLock.unlock();
    
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                    ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                            this, handler.get(), message.what);
    #endif
                    handler->handleMessage(message);//调用handler->handleMessage
                } // release handler
    
                mLock.lock();
                mSendingMessage = false;
                result = ALOOPER_POLL_CALLBACK;
            } else {
                // The last message left at the head of the queue determines the next wakeup time.
                mNextMessageUptime = messageEnvelope.uptime;//更新mNextMessageUptime
                break;
            }
        }

    处理Response:

        // 处理mResponses
        // Invoke all response callbacks.
        for (size_t i = 0; i < mResponses.size(); i++) {
            Response& response = mResponses.editItemAt(i);
            if (response.request.ident == ALOOPER_POLL_CALLBACK) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                        this, response.request.callback.get(), fd, events, data);
    #endif
                int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
                if (callbackResult == 0) {
                    removeFd(fd);
                }
                // Clear the callback reference in the response structure promptly because we
                // will not clear the response vector itself until the next poll.
                response.request.callback.clear();
                result = ALOOPER_POLL_CALLBACK;
            }
        }

        搞懂了上面的代码,我们就很容易明白wake函数做了些什么:

    void Looper::wake() {
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ wake", this);
    #endif
    
        ssize_t nWrite;
        do {
            nWrite = write(mWakeWritePipeFd, "W", 1);//向mWakeWritePipeFd中写入一个字符,所以mWakeReadPipeFd就会触发EPOLLIN event    } while (nWrite == -1 && errno == EINTR);
    
        if (nWrite != 1) {
            if (errno != EAGAIN) {
                ALOGW("Could not write wake signal, errno=%d", errno);
            }
        }
    }

        小结:

    1. Looper通过epoll函数组实现了一个可以支持随时唤醒的阻塞机制
    2. Looper支持两种不同的方式处理消息:Message + MessageHandler 和 LooperCallback。
    3. Looper的阻塞在如下四种条件下会被唤醒:
      • 发生错误
      • 等待超时
      • 出现需要处理的新Message(包括C++层和Java层)
      • 由addFd函数添加的Fd触发event

     

    总结:


    1. 在哪个线程调用JAVA层的Looper.loop(),Mesage和callback(包括Java层和C++层)就在哪个线程被处理,上图为Looper.loop函数的时序图。
    2. C++层的NativeMesasgeQueue不应该是Java层的MesageQueue的内部实现,而更接近于“栾生兄弟”的关系。MessageQueue负责处理java层上到消息,NativeMessageQueue负责处理C++层上的消息。其中Java层是在android.os.Looper.looper函数中调用android.os.Handler.dispatchMessage处理,而C++层是在android::Looper::pollInner函数中调用android::MessageHandler::handleMessage & android:LooperCallback::handleEvent函数处理。
    3. NativeMessageQueue利用Looper类实现了一个基于epoll函数和文件描述符(Fd)的可唤醒的阻塞机制。    
  • 相关阅读:
    社交网站后端项目开发日记(一)
    如何快速实现一个虚拟 DOM 系统
    你真的懂 export default 吗?
    vue项目中生产环境禁用debugger,关闭console
    css响应式设计
    浏览器对象-BOM
    js中的this关键字
    js类型转换
    console对象
    html DOM事件
  • 原文地址:https://www.cnblogs.com/james1207/p/3290232.html
Copyright © 2011-2022 走看看