zoukankan      html  css  js  c++  java
  • chromium源码阅读--进程的Message Loop

    上一篇总结了chromium进程的启动,接下来就看线程的消息处理,这里的线程包含进程的主线程。

    消息处理是由base::MessageLoop中实现,消息中的任务和定时器都是异步事件的。

    主要如下几点:

    1、消息的类型分类

    2、延时处理的消息是如何实现

    一、消息分类

         chromium主要将消息类型如下定义:(chromium//src/base/message_loop/message_loop.h  112行)

    1  enum Type {
    2     TYPE_DEFAULT,
    3     TYPE_UI,
    4     TYPE_CUSTOM,
    5     TYPE_IO,
    6 #if defined(OS_ANDROID)
    7     TYPE_JAVA,
    8 #endif  // defined(OS_ANDROID)
    9   };

    1.TYPE_DEFAULT:

          处理chromium定义的Task(闭包代码块)和定时器任务

    2.TYPE_UI:

         除了TYPE_DEFAULT定义的范围,还支持原生的UI事件消息(比如用户操作的窗口消息),MessageLoopForUI类

    3.TYPE_IO:

         除了TYPE_DEFAULT定义的范围,还支持异步IO的事件消息,MessageLoopForIO类

    4.TYPE_JAVA

        是Android平台的特有的消息消息,因为Android里,有java消息和native消息分层,native消息与java消息交互,java消息与应用程序交互,可以看做java消息接管了native消息。

    5.TYPE_CUSTOM

        定制消息,比较少见使用。

    消息类型的不同也就会创建不同的MessagePump。对于UI消息,不同的平台也会有不同的实现。在chromium//src/base/message_loop/message_loop.cc 166行

     1 // static
     2 std::unique_ptr<MessagePump> MessageLoop::CreateMessagePumpForType(Type type) {
     3 // TODO(rvargas): Get rid of the OS guards.
     4 #if defined(USE_GLIB) && !defined(OS_NACL)
     5   using MessagePumpForUI = MessagePumpGlib;
     6 #elif (defined(OS_LINUX) && !defined(OS_NACL)) || defined(OS_BSD)
     7   using MessagePumpForUI = MessagePumpLibevent;
     8 #elif defined(OS_FUCHSIA)
     9   using MessagePumpForUI = MessagePumpFuchsia;
    10 #endif
    11 
    12 #if defined(OS_IOS) || defined(OS_MACOSX)
    13 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(MessagePumpMac::Create())
    14 #elif defined(OS_NACL) || defined(OS_AIX)
    15 // Currently NaCl and AIX don't have a UI MessageLoop.
    16 // TODO(abarth): Figure out if we need this.
    17 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>()
    18 #else
    19 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(new MessagePumpForUI())
    20 #endif
    21 
    22 #if defined(OS_MACOSX)
    23   // Use an OS native runloop on Mac to support timer coalescing.
    24 #define MESSAGE_PUMP_DEFAULT 
    25   std::unique_ptr<MessagePump>(new MessagePumpCFRunLoop())
    26 #else
    27 #define MESSAGE_PUMP_DEFAULT 
    28   std::unique_ptr<MessagePump>(new MessagePumpDefault())
    29 #endif
    30 
    31   if (type == MessageLoop::TYPE_UI) {
    32     if (message_pump_for_ui_factory_)
    33       return message_pump_for_ui_factory_();
    34     return MESSAGE_PUMP_UI;
    35   }
    36   if (type == MessageLoop::TYPE_IO)
    37     return std::unique_ptr<MessagePump>(new MessagePumpForIO());
    38 
    39 #if defined(OS_ANDROID)
    40   if (type == MessageLoop::TYPE_JAVA)
    41     return std::unique_ptr<MessagePump>(new MessagePumpForUI());
    42 #endif
    43 
    44   DCHECK_EQ(MessageLoop::TYPE_DEFAULT, type);
    45   return MESSAGE_PUMP_DEFAULT;
    46 }
    View Code

    二、延时消息如何处理

    消息的处理与消息队列密不可分,internal::IncomingTaskQueue实现了一个线程安全的消息队列。 MessageLoop里定义了(chromium//src/base/message_loop/message_loop.h 392行)

    1 scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;

    接收到的消息就缓存在这个队列里。那么我们先看看这个类的构造函数。

     1 IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
     2     : always_schedule_work_(AlwaysNotifyPump(message_loop->type())),
     3       triage_tasks_(this),
     4       delayed_tasks_(this),
     5       deferred_tasks_(this),
     6       message_loop_(message_loop) {
     7   // The constructing sequence is not necessarily the running sequence in the
     8   // case of base::Thread.
     9   DETACH_FROM_SEQUENCE(sequence_checker_);
    10 }

    构造函数里通过MessageLoop的类型来初始化bool成员 always_schedule_work_ ,来判断是否对消息进行调度, 并保存了message_loop指针。

    继续分析代码,前面看到消息队列已经初始化了,那接下来我们看看是怎么往队列里添加任务的。

    bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
                                               OnceClosure task,
                                               TimeDelta delay,
                                               Nestable nestable) {
      ......
    
      PendingTask pending_task(from_here, std::move(task),
                               CalculateDelayedRuntime(delay), nestable);
      ......
      return PostPendingTask(&pending_task);
    }

    使用了PendingTask对象,并计算了延迟的时间和是否是嵌套任务。那么看PostPendingTask函数:

     1 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
     5   bool accept_new_tasks;
     6   bool schedule_work = false;
     7   {
     8     AutoLock auto_lock(incoming_queue_lock_);
     9     accept_new_tasks = accept_new_tasks_;
    10     if (accept_new_tasks)
    11       schedule_work = PostPendingTaskLockRequired(pending_task);
    12   }
    13 
    14   if (!accept_new_tasks) {
    19 pending_task->task.Reset(); 20 return false; 21 } 22 29 if (schedule_work) { 30 // Ensures |message_loop_| isn't destroyed while running. 31 AutoLock auto_lock(message_loop_lock_); 32 if (message_loop_) 33 message_loop_->ScheduleWork(); 34 } 35 36 return true; 37 }

    这里已经开始给线程加锁了,那么继续看PostPendingTaskLockRequired函数:

     1 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
     2   incoming_queue_lock_.AssertAcquired();
     3   ......
     4   
     7   pending_task->sequence_num = next_sequence_num_++;
     8 
     9   task_annotator_.DidQueueTask("MessageLoop::PostTask", *pending_task);
    10 
    11   bool was_empty = incoming_queue_.empty();
    12   incoming_queue_.push(std::move(*pending_task));
    13 
    14   if (is_ready_for_scheduling_ &&
    15       (always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) {
    21     message_loop_scheduled_ = true;
    22     return true;
    23   }
    24   return false;
    25 }

    这里看到pending_task是保存在incoming_queue_ 这里使用了std::queue容器(一个FIFO的数据结构),这个队列里面的任务还没有添加到MessageLoop中,也可以看到这里还没有明确任务的执行方式,使用的是FIFO队列。

    下面的几个成员变量,则就是在MessageLoop中使用了。

    chromium//src/base/message_loop/incoming_task_queue.h 217行

    1 // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
    2   TriageQueue triage_tasks_;
    3 
    4   // Queue for delayed tasks on the |sequence_checker_| sequence.
    5   DelayedQueue delayed_tasks_;
    6 
    7   // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
    8   DeferredQueue deferred_tasks_;

    1、TriageQueue

         这是第一个按默认的任务处理顺序(FIFO)接受所有任务的队列,这个队列的任务要马上执行或者放到下面的DelayedQueue 或者 DeferredQueue。

         triage_tasks_队列的任务是通过 IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue)来实现切换的,可以将 incoming_queue_ 和 triage_tasks_看成冷热备份的缓存,在triage_tasks_队列的任务执行完了,即为空时,就将待执行的incoming_queue_队列的任务与之交换。

    1 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
    2   if (queue_.empty()) {
    3     // TODO(robliao): Since these high resolution tasks aren't yet in the
    4     // delayed queue, they technically shouldn't trigger high resolution timers
    5     // until they are.
    6     outer_->pending_high_res_tasks_ += outer_->ReloadWorkQueue(&queue_);
    7   }
    8 }

    2、DelayedQueue

         这个队列是存放延迟执行的任务,并且按期望时间排序的

         delayed_tasks_是一个优先级队列,按delayed_run_time排序,chromium//src/base/pending_task.h 63行

    1 // PendingTasks are sorted by their |delayed_run_time| property.
    2 using DelayedTaskQueue = std::priority_queue<base::PendingTask>;

    3、DeferredQueue

         这个队列通常是存放哪些因为MessageLoop嵌套而不能执行的任务,这些任务通常会在空闲的时候执行。

    OK,看到这里,我们回顾一下MessageLoop的执行流程:

    1 void MessageLoop::Run() {
    2   DCHECK_EQ(this, current());
    3   pump_->Run(this);
    4 }

    由MessagePump来执行,那么我们选择默认的MessagePump来看Run的流程,chromium//src/base/message_loop/message_pump_default.cc 29行:

     1 void MessagePumpDefault::Run(Delegate* delegate) {
     2   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
     3 
     4   for (;;) {
     5 #if defined(OS_MACOSX)
     6     mac::ScopedNSAutoreleasePool autorelease_pool;
     7 #endif
     8 
     9     bool did_work = delegate->DoWork();
    10     if (!keep_running_)
    11       break;
    12 
    13     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    14     if (!keep_running_)
    15       break;
    16 
    17     if (did_work)
    18       continue;
    19 
    20     did_work = delegate->DoIdleWork();
    21     if (!keep_running_)
    22       break;
    23 
    24     if (did_work)
    25       continue;
    26 
    27     ThreadRestrictions::ScopedAllowWait allow_wait;
    28     if (delayed_work_time_.is_null()) {
    29       event_.Wait();
    30     } else {
    31       event_.TimedWaitUntil(delayed_work_time_);
    32     }
    33   }
    34 }
    View Code

    其中的流程是,delegate->DoWork(), delegate->DoDelayedWork(&delayed_work_time_),delegate->DoIdleWork()。

    也可以看其他平台的MessagePump,主要的流程是一致的,都是delegate的函数,而delegate指向一个MessageLoop指针,那么又回到MessageLoop中。

    上面具体的DoWork就不详述了,接下来看看延迟任务是如何实现的:

    chromium//src/base/message_loop/message_loop.cc 473行

    TimeTicks next_run_time =
          incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
      if (next_run_time > recent_time_) {
        recent_time_ = TimeTicks::Now();  // Get a better view of Now();
        if (next_run_time > recent_time_) {
          *next_delayed_work_time = next_run_time;
          return false;
        }
      }
    
      PendingTask pending_task = incoming_task_queue_->delayed_tasks().Pop();
    
      if (incoming_task_queue_->delayed_tasks().HasTasks()) {
        *next_delayed_work_time =
            incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
      }
    
      return DeferOrRunPendingTask(std::move(pending_task));

    在下一个可执行时间到来了,就会从incoming_task_queue_->delayed_tasks().Pop() 出来一个task, 如果incoming_task_queue_->delayed_tasks()里还有延迟任务,就取里面优先级最高的任务的延迟时间作为下次判断的时间。

    到这里,消息的处理和延迟任务的执行都完成了。

    好了,回到上面的Run()函数流程,在DoIdleWork()完成之后,当前线程开始休眠,直到有新的任务来会重新唤醒。

    嗯,这篇就到这里了,下面一篇会总结IPC消息声明和IPC channel的创建。

  • 相关阅读:
    10-12
    8-10
    5.2-5.3
    四则运算 测试与封装
    第5-7章
    汉堡包
    1-5章
    实验二
    实验一
    Controller方法是如何与请求匹配上的及参数如何填充的
  • 原文地址:https://www.cnblogs.com/danxi/p/7691206.html
Copyright © 2011-2022 走看看