美文网首页程序员Android开发
Handler、Looper、MessageQueue分析

Handler、Looper、MessageQueue分析

作者: 青叶小小 | 来源:发表于2020-12-29 12:18 被阅读0次

一、子线程创建Handler时注意事项及源码分析

new Thread(new Runnable(){ 
    @Override
    public void run(){ 
        // 在子线程中创建Handler需要先prepare,否则会crash
        // 该方法告诉Looper为当前线程创建一个新的Looper,
        // 并保存到Looper的本地线程变量中(sThreadLocal)
        Looper.prepare();
        mHandler = new Handler()
        Looper.loop(); // 死循环
    }
}).start();

分析创建Handler时的流程:

/**
 * Default constructor associates this handler with the {@link Looper} for the
 * current thread.
 * 初始化前,Looper必需已经存在,否则会报错(因为后面会提到,Handler会绑定到Looper上,接收消息)
 */
public Handler() {
    this(null, false);
}

// 默认是当前线程的Looper,除非指定Looper
public Handler(Looper looper) {
    this(looper, null, false);
}

/**
 * Use the {@link Looper} for the current thread with the specified callback interface
 * and set whether the handler should be asynchronous.
 * 
 * Handlers 默认是同步,除非在初始化时指定用异步.
 *
 * Asynchronous messages represent interrupts or events that do not require global ordering
 * with respect to synchronous messages.  Asynchronous messages are not subject to
 * the synchronization barriers introduced by {@link MessageQueue#enqueueSyncBarrier(long)}.
 *
 * @param callback The callback interface in which to handle messages, or null.
 * @param async If true, the handler calls {@link Message#setAsynchronous(boolean)} for
 * each {@link Message} that is sent to it or {@link Runnable} that is posted to it.
 */
public Handler(Callback callback, boolean async) {
    ......
    mLooper = Looper.myLooper(); // 重点这里.....不指定Looper则用当前线程的
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

myLooper在Looper类中,核心代码如下:

public final class Looper {
    /**
     * This class contains the code required to set up and manage an event loop
     * based on MessageQueue.  APIs that affect the state of the queue should be
     * defined on MessageQueue or Handler rather than on Looper itself.  For example,
     * idle handlers and sync barriers are defined on the queue whereas preparing the
     * thread, looping, and quitting are defined on the looper.
     */

    // sThreadLocal.get() will return null unless you've called prepare().
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    private static Looper sMainLooper;  // guarded by Looper.class

    final MessageQueue mQueue;
    final Thread mThread;
    ......
    /** Initialize the current thread as a looper.
      * This gives you a chance to create handlers that then reference
      * this looper, before actually starting the loop. Be sure to call
      * {@link #loop()} after calling this method, and end it by calling
      * {@link #quit()}.
      */
    public static void prepare() {
        prepare(true);
    }
    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed)); // 只有主线程该变量quitAllowed才为false
    }
    ......
    
    // sThreadLocal是本地线程变量,存放的是Looper对象
    // sThreadLocal的set在prepare中
    public static @Nullable Looper myLooper() {
        return sThreadLocal.get();
    }

    ......
}

二、主线程创建Handler分析

在ActivityThread.java中

public static void main(String[] args) {
    ......
    Looper.prepareMainLooper(); // 创建UI主线程的Looper
    ......
    ActivityThread thread = new ActivityThread();
    thread.attach(false, startSeq);
    
    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }
    ...
    Looper.loop();
    throw new RuntimeException("Main thread loop unexpectedly exited");
}

Looper.java中

public static void prepareMainLooper() {
    prepare(false); // 创建Looper,且MessageQueue不允许自动退出(即销毁)
    synchronized (Looper.class) { // 同步等待Looper创建好
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper(); // 保存到sMainLooper中
    }
}

三、Handler收/发消息

3.1、发送 & 接收消息方式

// 创建数据
Bundle bundle = new Bundle();
bundle.putChar("key", 'v'); 
bundle.putString("key","value");

// 最好用 obtainMessage,因为可以反复使用
// 因为Message.java中有个 pool(池)的概念
Message message = mHandler.obtainMessage();
message.what = 0;
// 也可以是 message.obj = xxx;
message.setData(bundle);
mHandler.sendMessage(message);

// 后面通过Handler中的 handleMessage 重载可以收到消息
mHandler = new Handler() {
    @Override
    public void handleMessage(Message msg) {
        super.handleMessage(msg);
    }
};

3.2、分析sendMessage

/**
 * Pushes a message onto the end of the message queue after all pending messages
 * before the current time. It will be received in {@link #handleMessage},
 * in the thread attached to this handler.
 *  
 * @return Returns true if the message was successfully placed in to the 
 *         message queue.  Returns false on failure, usually because the
 *         looper processing the message queue is exiting.
 */
public final boolean sendMessage(Message msg){
    return sendMessageDelayed(msg, 0);
}

// 可以指定消息多久后被发送至handler中处理
public final boolean sendMessageDelayed(Message msg, long delayMillis){
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

// mQueue是在prepare时,初始化Looper中创建的消息队列
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

// 默认将消息发送给当前handler即自己,这也是为啥重载handleMessage能收到消息
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

MessageQueue.enqueueMessage

public final class MessageQueue {
    ......
    
    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        synchronized (this) {
            ......

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;

            // 非延时消息,则该消息会插入到消息队列头部
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // 延时消息则通过判断每个消息的 when,来插入到合适的地方
                // 所以,整个消息队列中的消息是按照 when 来排序的
                // 相同 when 的消息,新消息会在这些消息的开头添加
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                // 这里涉及到 Linux 的管道,后续会分析!!!
                nativeWake(mPtr);
            }
        }
        return true;
    }
}

nativeWake唤醒Looper(loop方法中的nativePollOnce阻塞了

void NativeMessageQueue::wake() {
    mLooper->wake();
}
void Looper::wake() {
    uint64_t inc = 1;
    // 向管道 mWakeEventFd 写入字符1 , 写入失败仍然不断执行
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
}

3.3、接收消息

Looper从MQ中获取消息,并发送到指定的地方去!取消息是由Looper负责,具体的地方在 loop方法中

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
    ......
    for (;;) {
         // 没有消息意味着MQ退出了.....所以,这里是死循环
        // next方法会阻塞这里(消息时间未到,或者MQ为空,就会通知native休眠,直到有消息需要处理)
        Message msg = queue.next();
        if (msg == null) {
            return;
        }

        // 这块代码是计算消息是否被延迟发送出去了!
        // 当消息很多时,或者系统负载过高,looper来不及处理,
        // 那么,应该在when时发送的消息就会来不及发送
        ......
        try {
            // msg.target就是handler自己
            msg.target.dispatchMessage(msg);
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
        } finally {
            ......
        }
        ......
        // 消息回收
        msg.recycleUnchecked();
    }
}

详看nativePollOnce那里的注释!
前面提到了nativeWake,当队列为空 / 队列中的消息.when还未到时,这时插入delay = 0的消息就需要立即唤醒处理

public final class MessageQueue {
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            // 下一个 msg.when 还未超时,刷新一次
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            // 这里会阻塞:
            // 1. nextPollTimeoutMillis为-1即MQ为空时
            // 2. nextPollTimeoutMillis不为0时
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }                
                ......
            }
            // 此处省略 idleHandler的代码
            ......
            nextPollTimeoutMillis = 0;
        }
    }
}

nativePollOnce的native代码:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {
    //ptr消息队列地址
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    //调用Looper的pollOnce
    mLooper->(timeoutMillis);
}

【 /system/core/libutils/Looper.cpp 】

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        ...
        if (result != 0) {
            ...
            return result;
        }
        // 再处理内部轮询
        result = pollInner(timeoutMillis); 
    }
}
int Looper::pollInner(int timeoutMillis) {
    ...
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    mPolling = true; //即将处于idle状态
    // fd最大个数为16
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 
    // 等待事件发生或者超时,在 nativeWake() 方法,向管道写端写入字符,则该方法会返回;
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    ...
    return result;
}

Handler中的dispatchMessage

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        // 消息如果有设置callback则优先给它
        handleCallback(msg);
    } else {
        // 如果在初始化Handler时有指定callback则给指定的函数
        // mCallback可以处理,但如何没有返回为 true,消息还会继续
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        // 最终给Handler自身(如果没有重载,则该消息就会丢掉)
        handleMessage(msg);
    }
}

至此,一个完整的消息发送、接收流程就分析完了!

四、补充

4.1、其它发送消息方式

  • View.post / .postDelay 也可以发送消息(View类中有创建Handler,绑定到主线程)
  • Handler.post / .postDelay
  • Activity.runOnUiThread

以上本质都是通过Handler来发送消息!

4.2、MessageQueue的数据结构

先进先出(FIFO):相同when的消息直接插入到其开头

4.3、Handler引起内存泄露的原因

Handler允许发送延时消息,因此,当Activity退出后,就会出现内存泄露:

  • Message会持有Handler(msg.target = this,即 handler);
  • Java特性(内部类持有外部类),因此,Handler会持有Activity;

解决:Handler定义为静态内部类且内部持有Activity的弱引用,并及时移除所有消息

public class MainActivity extends AppCompatActivity {
    private SafeHandler safeHandler = new SafeHandler(this);
    public void handleMessage(Message msg) {
        // TODO something
    }

    private static class SafeHandler extends Handler {
        private WeakReference<MainActivity> ref;

        public SafeHandler(MainActivity activity) {
            this.ref = new WeakReference<>(activity);
        }

        @Override
        public void handleMessage(Message msg) {
            MainActivity activity = ref.get();
            if (activity != null) {
                activity.handleMessage(msg);
            }
        }
    }

    @Override
    protected void onDestroy() {
        // 移除后续所有未处理的消息
        safeHandler.removeCallbacksAndMessages(null);
        super.onDestroy();
    }
}

注:单纯在 onDestroy 中移除消息并不保险,因为该方法不一定执行!

4.4、dispatchMessage的Callback

{
    if (mCallback != null) {
        if (mCallback.handleMessage(msg)) { // false还会走到 handleMessage
            return;
        }
    }
    handleMessage(msg);
}

这个机制就有意思了,比如拦截 ActivityThread.mH 这个 Handler 做一些事(几乎所有的插件化框架都使用了该方法)

4.5、创建Message实例的最佳方式

  • 通过 Message 的静态方法:Message.obtain
  • 通过 Handler 的公有方法:handler.obtainMessage (实际还是上一种方式)

相关文章

网友评论

    本文标题:Handler、Looper、MessageQueue分析

    本文链接:https://www.haomeiwen.com/subject/ffyfoktx.html