zoukankan      html  css  js  c++  java
  • Android多线程分析之四:MessageQueue的实现

    Android多线程分析之四:MessageQueue的实现

    CC 许可,转载请注明出处

    在前面两篇文章《Android多线程分析之二:Thread的实现》,《Android多线程分析之三:Handler,Looper的实现》中分别介绍了 Thread 的创建,运行,销毁的过程以及 Thread与 Handler,Looper 之间的关联:Thread 在其 run() 方法中创建和运行消息处理循环 Looper,而 Looper::loop() 方法不断地从 MessageQueue 中获取消息,并由 Handler 分发处理该消息。接下来就来介绍 MessageQueue 的运作机制,MessageQueue。

    参考源码:

    android/framework/base/core/java/android/os/MessageQueue.java
    android/framework/base/core/java/android/os/Message.java
    android/frameworks/base/core/jni/android_os_MessageQueue.h
    android/frameworks/base/core/jni/android_os_MessageQueue.cpp
    

    先来看 MessageQueue 的构造函数以及重要的成员变量:

        // True if the message queue can be quit.
        private final boolean mQuitAllowed;
        private int mPtr; // used by native code
        Message mMessages;
        private boolean mQuiting;
        // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
        private boolean mBlocked;
    

    mQuitAllowed: 其含义与 Looper.prepare(boolean quitAllowed) 中参数含义一直,是否允许中止;
    mPtr:Android MessageQueue 是通过调用 C++ native MessageQueue 实现的,这个 mPtr 就是指向 native MessageQueue;
    mMessages:Message 是链表结构的,因此这个变量就代表 Message 链表;
    mQuiting:是否终止了;
    mBlocked:是否正在等待被激活以获取消息;

    MessageQueue 的构造函数很简单:

        MessageQueue(boolean quitAllowed) {
            mQuitAllowed = quitAllowed;
            nativeInit();
        }
    

    它通过转调 native 方法 nativeInit() 实现的,后者是定义在 android_os_MessageQueue.cpp 中:

    static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return;
        }
    
        nativeMessageQueue->incStrong(env);
        android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
    }
    static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
            NativeMessageQueue* nativeMessageQueue) {
        env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
                 reinterpret_cast<jint>(nativeMessageQueue));
    }
    

    nativeInit() 方法创建 NativeMessageQueue 对象,并将这个对象的指针复制给 Android MessageQueue 的 mPtr。NativeMessageQueue 的定义如下:

    class MessageQueue : public RefBase {
    public:
        /* Gets the message queue's looper. */
        inline sp<Looper> getLooper() const {
            return mLooper;
        }
    
        bool raiseAndClearException(JNIEnv* env, const char* msg);
        virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0;
    protected:
        MessageQueue();
        virtual ~MessageQueue();
    protected:
        sp<Looper> mLooper;
    };
    
    class NativeMessageQueue : public MessageQueue {
    public:
        NativeMessageQueue();
        virtual ~NativeMessageQueue();
        virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
        void pollOnce(JNIEnv* env, int timeoutMillis);
        void wake();
    
    private:
        bool mInCallback;
        jthrowable mExceptionObj;
    };
    

    其中值得关注的是 NativeMessageQueue 的构造以及pollOnce,wake 两个方法,它们是Java MessageQueue 中 nativePollOnce 和 nativeWake 的 native 方法:

    NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();
        if (mLooper == NULL) {
            mLooper = new Looper(false);
            Looper::setForThread(mLooper);
        }
    }
    
    void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
        mInCallback = true;
        mLooper->pollOnce(timeoutMillis);
        mInCallback = false;
    }
    
    void NativeMessageQueue::wake() {
        mLooper->wake();
    }

    在 NativeMessageQueue 的构造函数中,会获取当前线程的 Looper(注意这是 C++ Looper,定义在frameworks/native/libs/utils/Looper.h 中),如果当前线程还没有 Looper,就创建一个,并保存在线程的 TLS 中。pollOnce 和 wake 最终都是通过 Linux 的 epoll 模型来实现的。pollOnce() 通过等待被激活,然后从消息队列中获取消息;wake() 则是激活处于等待状态的消息队列,通知它有消息到达了。这是典型的生产者-消费者模型。

    对于Android MessageQueue 来说,其主要的工作就是:接收投递进来的消息,获取下一个需要处理的消息。这两个功能是通过 enqueueMessage() 和 next() 方法实现的。next() 在前一篇文章介绍 Looper.loop() 时提到过。

    在分析这两个函数之前,先来介绍一下 Message:前面说过 Message 是完备的,即它同时带有消息内容和处理消息的 Handler 或 callback。下面列出它的主要成员变量:

    public int what;     // 消息 id
    public int arg1;     // 消息参数
    public int arg2;     // 消息参数
    public Object obj;   // 消息参数
    long when;           // 处理延迟时间,由 Handler 的 sendMessageDelayed/postDelayed 设置
    Handler target;    // 处理消息的 Handler
    Runnable callback;   // 处理消息的回调
    Message next;    // 链表结构,指向下一个消息
    

    Message 有一些名为 obtain 的静态方法用于创建 Message,通常我们都是通过 Handler 的 obtain 静态方法转调 Message 的静态方法来创建新的 Message。

    接下来分析 enqueueMessage:

        final boolean enqueueMessage(Message msg, long when) {
            if (msg.isInUse()) {
                throw new AndroidRuntimeException(msg + " This message is already in use.");
            }
            if (msg.target == null) {
                throw new AndroidRuntimeException("Message must have a target.");
            }
    
            boolean needWake;
            synchronized (this) {
                if (mQuiting) {
                    return false;
                }
    
                msg.when = when;
                Message p = mMessages;
                if (p == null || when == 0 || when < p.when) {
                    // New head, wake up the event queue if blocked.
                    msg.next = p;
                    mMessages = msg;
                    needWake = mBlocked;
                } else {
                    // Inserted within the middle of the queue.  Usually we don't have to wake
                    // up the event queue unless there is a barrier at the head of the queue
                    // and the message is the earliest asynchronous message in the queue.
                    needWake = mBlocked && p.target == null && msg.isAsynchronous();
                    Message prev;
                    for (;;) {
                        prev = p;
                        p = p.next;
                        if (p == null || when < p.when) {
                            break;
                        }
                        if (needWake && p.isAsynchronous()) {
                            needWake = false;
                        }
                    }
                    msg.next = p; // invariant: p == prev.next
                    prev.next = msg;
                }
            }
            if (needWake) {
                nativeWake(mPtr);
            }
            return true;
        }
    

    首先检测消息的合法性:是否已经在处理中和是否有处理它的Handler,然后判断 mQuiting 是否中止了,如果没有则根据消息处理时间排序将消息插入链表中的合适位置。在这其中作了一些减少同步操作的优化,即使当前消息队列已经处于 Blocked 状态,且队首是一个消息屏障(和内存屏障的理念一样,这里是通过 p.target == null 来判断队首是否是消息屏障),并且要插入的消息是所有异步消息中最早要处理的才会 needwake 激活消息队列去获取下一个消息。Handler 的 post/sendMessage 系列方法最后都是通过转调 MessageQueue 的 enqueueMessage 来实现的,比如:

        public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
            MessageQueue queue = mQueue;
            if (queue == null) {
                RuntimeException e = new RuntimeException(
                        this + " sendMessageAtTime() called with no mQueue");
                Log.w("Looper", e.getMessage(), e);
                return false;
            }
            return enqueueMessage(queue, msg, uptimeMillis);
        }
    
        private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
            msg.target = this;
            if (mAsynchronous) {
                msg.setAsynchronous(true);
            }
            return queue.enqueueMessage(msg, uptimeMillis);
        }
    

    其实 Handler 中与Message 相关的静态方法都是通过 MessageQueue 的对应的静态方法实现的,比如 removeMessages, hasMessages, hasCallbacks 等等,这里就不一一详述了。至此,已经完整地分析了如何通过 Handler 提交消息到 MessageQueue 中了。

    下面来分析如何从 MessageQueue 中获取合适的消息, 这是 next() 要做的最主要的事情,next() 方法还做了其他一些事情,这些其它事情是为了提高系统效果,利用消息队列在空闲时通过 idle handler 做一些事情,比如 gc 等等。但它们和获取消息关系不大,所以这部分将从略介绍。

       final Message next() {
            int pendingIdleHandlerCount = -1; // -1 only during first iteration
            int nextPollTimeoutMillis = 0;
    
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
                nativePollOnce(mPtr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    if (mQuiting) {
                        return null;
                    }
    
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    Message prevMsg = null;
                    Message msg = mMessages;
                    if (msg != null && msg.target == null) {
                        // Stalled by a barrier.  Find the next asynchronous message in the queue.
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    if (msg != null) {
                        if (now < msg.when) {
                            // Next message is not ready.  Set a timeout to wake up when it is ready.
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {
                            // Got a message.
                            mBlocked = false;
                            if (prevMsg != null) {
                                prevMsg.next = msg.next;
                            } else {
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            if (false) Log.v("MessageQueue", "Returning message: " + msg);
                            msg.markInUse();
                            return msg;
                        }
                    } else {
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                    }
    
                    // If first time idle, then get the number of idlers to run.
                    // Idle handles only run if the queue is empty or if the first message
                    // in the queue (possibly a barrier) is due to be handled in the future.
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) {
                        // No idle handlers to run.  Loop and wait some more.
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
    
                // Run the idle handlers.
                // We only ever reach this code block during the first iteration.
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf("MessageQueue", "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
    
                // Reset the idle handler count to 0 so we do not run them again.
                pendingIdleHandlerCount = 0;
    
                // While calling an idle handler, a new message could have been delivered
                // so go back and look again for a pending message without waiting.
                nextPollTimeoutMillis = 0;
            }
        }
    

    队列被激活之后,首先判断队首是不是消息屏障,如果是则跳过所有的同步消息,查找最先要处理的异步消息。如果第一个待处理的消息还没有到要处理的时机则设置激活等待时间;否则这个消息就是需要处理的消息,将该消息设置为 inuse,并将队列设置为非 blocked 状态,然后返回该消息。next() 方法是在 Looper.loop() 中被调用的,Looper 在获得要处理的消息之后就会调用和消息关联的 Handler 来分发消息,这里再回顾一下:

      public static void loop() {
            final Looper me = myLooper();
            if (me == null) {
                throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
            }
            final MessageQueue queue = me.mQueue;
            ...
            for (;;) {
                Message msg = queue.next(); // might block
                if (msg == null) {
                    // No message indicates that the message queue is quitting.
                    return;
                }
    
                msg.target.dispatchMessage(msg);
    
                msg.recycle();
            }
        }
    

    如果队列中没有消息或者第一个待处理的消息时机未到,且也没有其他利用队列空闲要处理的事务,则将队列设置为设置 blocked 状态,进入等待状态;否则就利用队列空闲处理其它事务。

    至此,已经对 Android 多线程相关的主要概念 Thread, HandlerThread, Handler, Looper, Message, MessageQueue 作了一番介绍,下一篇就要讲讲 AsyncTask,这是为了简化 UI 多线程编程为提供的一个便利工具类。

  • 相关阅读:
    异常介绍
    docker 命令
    acm
    Openfiler能把标准x86/64架构的系统变成一个强大的NAS、SAN存储和IP存储网关
    docker 图解学习
    基于Docker的TensorFlow机器学习框架搭建和实例源码解读
    菜鸟打印控件
    Oracle 12c on Solaris 10 安装文档
    内存对齐小解
    安装oracle 11gr2 rac on solaris
  • 原文地址:https://www.cnblogs.com/kesalin/p/android_messagequeue.html
Copyright © 2011-2022 走看看