zoukankan      html  css  js  c++  java
  • Handler消息机制源码解析

    初始化Handler对象的时候,使用的是无参的构造方法:

    public Handler() {
    this(null, false);
    }
    可以看到它内部调用的是另外一个两个参数的构造方法:

    public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
    final Class<? extends Handler> klass = getClass();
    if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
    (klass.getModifiers() & Modifier.STATIC) == 0) {
    Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
    klass.getCanonicalName());
    }
    }
    //1
    mLooper = Looper.myLooper();
    if (mLooper == null) {
    throw new RuntimeException(
    "Can't create handler inside thread " + Thread.currentThread()
    + " that has not called Looper.prepare()");
    }
    //2
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
    }
     这个构造方法主要作用是获取 mLooper 和 mQueue 对象,其中 mLooper 是通过 Looper.myLooper() 获取。

    public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
    }
     在myLooper 方法中,Looper对象是从 sThreadLocal中获取。

    创建 ThreadLocal 对象:

    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    ThreadLocal 是确保当前线程有且只有一个Looper对象。 做了软件开发几年了,只有在这个Looper的源码中才接触到ThreadLocal,上次在正航软件面试的时候,聊到 ThreadLocal ,只答出了 ThreadLocal 的作用,并不了 ThreadLocal 的原理,后续会研究下 ThreadLocal 的原理,并写一篇相关的文章。

    扯远了,继续拉回正题。

    看下 Looper 对象是如何被放到 ThreadLocal 中的:

    public static void prepare() {
    prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
    throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
    }
    prepare() 是一个私有的静态的方法。如果 ThreadLocal 中已经存在Looper对象,这时候会抛出异常。只有没有Looper对象的时候,会创建一个新的 Looper 对象。

    看看 Looper 的构造方法:

    private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
    }
    在这个构造方法中:

     创建了 MessageQueue 对象,所以在 Handler 的构造方法中才能从Looper中获取到 MessageQueue。

    而这个 Handler 中 用到的 Looper 对象是什么时候创建的呢?

    在主线程的 ActivityThread 的main 方法中创建的:

    在主线程中调用了 :

    Looper.prepareMainLooper();

    Looper.loop();

    具体代码:

    prepareMainLooper 方法:

    public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
    if (sMainLooper != null) {
    throw new IllegalStateException("The main Looper has already been prepared.");
    }
    sMainLooper = myLooper();
    }
    }
    在这个方法中,在当前线程中有且仅有一个Looper对象。 

    通过上面的分析,Handler,Looper,MessageQueue 都有。而他们是怎么进行消息的传送。

    消息传送首先要发送消息,调用的是 Handler 的 sendMessage() 方法。

    public final boolean sendMessage(Message msg)
    {
    return sendMessageDelayed(msg, 0);
    }



    public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
    if (delayMillis < 0) {
    delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }



    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
    RuntimeException e = new RuntimeException(
    this + " sendMessageAtTime() called with no mQueue");
    Log.w("Looper", e.getMessage(), e);
    return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
    }
    上面代码的流程是:sendMessage() → sendMessageDelayed() → sendMessageAtTime() → enqueueMessage()

    在 sendMessageAtTime() 方法中,拿到 构造方法中获取的 mQueue。将消息(msg)和 消息队列(mQueue) 传入方法enqueueMessage() 中。

    再看看 enqueueMessage() :

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
    msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
    }
     在Handler类的 enqueueMessage() 方法中,调用 queue.enqueueMessage() 将消息放入消息队列中。

    MessageQueue 类中的 enqueueMessage() : 在此方法中,进行JNI调用,最终调用到Native 方法:nativeWake。Native方法就不往下继续追了。

    boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
    throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
    throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
    if (mQuitting) {
    IllegalStateException e = new IllegalStateException(
    msg.target + " sending message to a Handler on a dead thread");
    Log.w(TAG, e.getMessage(), e);
    msg.recycle();
    return false;
    }

    msg.markInUse();
    msg.when = when;
    Message p = mMessages;
    boolean needWake;
    if (p == null || when == 0 || when < p.when) {
    // New head, wake up the event queue if blocked.
    msg.next = p;
    mMessages = msg;
    needWake = mBlocked;
    } else {
    // Inserted within the middle of the queue. Usually we don't have to wake
    // up the event queue unless there is a barrier at the head of the queue
    // and the message is the earliest asynchronous message in the queue.
    needWake = mBlocked && p.target == null && msg.isAsynchronous();
    Message prev;
    for (;;) {
    prev = p;
    p = p.next;
    if (p == null || when < p.when) {
    break;
    }
    if (needWake && p.isAsynchronous()) {
    needWake = false;
    }
    }
    msg.next = p; // invariant: p == prev.next
    prev.next = msg;
    }

    // We can assume mPtr != 0 because mQuitting is false.
    if (needWake) {
    nativeWake(mPtr);
    }
    }
    return true;
    }
    以上步骤 Handler 已经将消息发送到消息队列中了。

    这时候在回头看在 ActivityThread 的main 方法中,调用到了 Looper 的 loop() 方法。

    loop 方法 :

    /**
    * Run the message queue in this thread. Be sure to call
    * {@link #quit()} to end the loop.
    */
    public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
    throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;

    // Make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();

    // Allow overriding a threshold with a system prop. e.g.
    // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
    final int thresholdOverride =
    SystemProperties.getInt("log.looper."
    + Process.myUid() + "."
    + Thread.currentThread().getName()
    + ".slow", 0);

    boolean slowDeliveryDetected = false;

    for (;;) {
    Message msg = queue.next(); // might block
    if (msg == null) {
    // No message indicates that the message queue is quitting.
    return;
    }

    // This must be in a local variable, in case a UI event sets the logger
    final Printer logging = me.mLogging;
    if (logging != null) {
    logging.println(">>>>> Dispatching to " + msg.target + " " +
    msg.callback + ": " + msg.what);
    }

    final long traceTag = me.mTraceTag;
    long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
    long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
    if (thresholdOverride > 0) {
    slowDispatchThresholdMs = thresholdOverride;
    slowDeliveryThresholdMs = thresholdOverride;
    }
    final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
    final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);

    final boolean needStartTime = logSlowDelivery || logSlowDispatch;
    final boolean needEndTime = logSlowDispatch;

    if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
    Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
    }

    final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
    final long dispatchEnd;
    try {
    msg.target.dispatchMessage(msg);
    dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
    } finally {
    if (traceTag != 0) {
    Trace.traceEnd(traceTag);
    }
    }
    if (logSlowDelivery) {
    if (slowDeliveryDetected) {
    if ((dispatchStart - msg.when) <= 10) {
    Slog.w(TAG, "Drained");
    slowDeliveryDetected = false;
    }
    } else {
    if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
    msg)) {
    // Once we write a slow delivery log, suppress until the queue drains.
    slowDeliveryDetected = true;
    }
    }
    }
    if (logSlowDispatch) {
    showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
    }

    if (logging != null) {
    logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
    }

    // Make sure that during the course of dispatching the
    // identity of the thread wasn't corrupted.
    final long newIdent = Binder.clearCallingIdentity();
    if (ident != newIdent) {
    Log.wtf(TAG, "Thread identity changed from 0x"
    + Long.toHexString(ident) + " to 0x"
    + Long.toHexString(newIdent) + " while dispatching to "
    + msg.target.getClass().getName() + " "
    + msg.callback + " what=" + msg.what);
    }

    msg.recycleUnchecked();
    }
    }
    在这个方法中,开启一个死循环,不断的调用:Message msg = queue.next();

    MessageQueue 类中的 next() 方法:

    Message next() {
    // Return here if the message loop has already quit and been disposed.
    // This can happen if the application tries to restart a looper after quit
    // which is not supported.
    final long ptr = mPtr;
    if (ptr == 0) {
    return null;
    }

    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
    if (nextPollTimeoutMillis != 0) {
    Binder.flushPendingCommands();
    }

    nativePollOnce(ptr, nextPollTimeoutMillis);

    synchronized (this) {
    // Try to retrieve the next message. Return if found.
    final long now = SystemClock.uptimeMillis();
    Message prevMsg = null;
    Message msg = mMessages;
    if (msg != null && msg.target == null) {
    // Stalled by a barrier. Find the next asynchronous message in the queue.
    do {
    prevMsg = msg;
    msg = msg.next;
    } while (msg != null && !msg.isAsynchronous());
    }
    if (msg != null) {
    if (now < msg.when) {
    // Next message is not ready. Set a timeout to wake up when it is ready.
    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
    } else {
    // Got a message.
    mBlocked = false;
    if (prevMsg != null) {
    prevMsg.next = msg.next;
    } else {
    mMessages = msg.next;
    }
    msg.next = null;
    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
    msg.markInUse();
    return msg;
    }
    } else {
    // No more messages.
    nextPollTimeoutMillis = -1;
    }

    // Process the quit message now that all pending messages have been handled.
    if (mQuitting) {
    dispose();
    return null;
    }

    // If first time idle, then get the number of idlers to run.
    // Idle handles only run if the queue is empty or if the first message
    // in the queue (possibly a barrier) is due to be handled in the future.
    if (pendingIdleHandlerCount < 0
    && (mMessages == null || now < mMessages.when)) {
    pendingIdleHandlerCount = mIdleHandlers.size();
    }
    if (pendingIdleHandlerCount <= 0) {
    // No idle handlers to run. Loop and wait some more.
    mBlocked = true;
    continue;
    }

    if (mPendingIdleHandlers == null) {
    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
    }
    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
    }

    // Run the idle handlers.
    // We only ever reach this code block during the first iteration.
    for (int i = 0; i < pendingIdleHandlerCount; i++) {
    final IdleHandler idler = mPendingIdleHandlers[i];
    mPendingIdleHandlers[i] = null; // release the reference to the handler

    boolean keep = false;
    try {
    keep = idler.queueIdle(http://www.amjmh.com/v/);
    } catch (Throwable t) {
    Log.wtf(TAG, "IdleHandler threw exception", t);
    }

    if (!keep) {
    synchronized (this) {
    mIdleHandlers.remove(idler);
    }
    }
    }

    // Reset the idle handler count to 0 so we do not run them again.
    pendingIdleHandlerCount = 0;

    // While calling an idle handler, a new message could have been delivered
    // so go back and look again for a pending message without waiting.
    nextPollTimeoutMillis = 0;
    }
    }
    这样消息就再次被拿出来了,并调用了:msg.target.dispatchMessage(msg); 而这个 target 实际是一个 Handler 对象。

    Handler 类 中的 dispatchMessage()方法:

    public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
    handleCallback(msg);
    } else {
    if (mCallback != null) {
    if (mCallback.handleMessage(msg)) {
    return;
    }
    }
    handleMessage(msg);
    }
    }
    在这个方法中看到了非常熟悉的 handleMessage() 方法,在重写的 handleMessage 方法就可以执行我们期望做的事情了。

  • 相关阅读:
    spark 读取mongodb失败,报executor time out 和GC overhead limit exceeded 异常
    在zepplin 使用spark sql 查询mongodb的数据
    Unable to query from Mongodb from Zeppelin using spark
    spark 与zepplin 版本兼容
    kafka 新旧消费者的区别
    kafka 新生产者发送消息流程
    spark ui acl 不生效的问题分析
    python中if __name__ == '__main__': 的解析
    深入C++的new
    NSSplitView
  • 原文地址:https://www.cnblogs.com/hyhy904/p/11513045.html
Copyright © 2011-2022 走看看