zoukankan      html  css  js  c++  java
  • [fllutter engine] 并发消息队列

    concurrent message loop

    本文首先介绍 fml::ConcurrentMessageLoop , 并后续分析更多 fml 中的并发基础架构

    ConcurrentMessageLoop 和普通的 MessageLoop 功能类似, 且接口有所简化, 使用流程为

    1. 创建 ConcurrentMessageLoop
    2. 从并发消息队列获取 ConcurrentTaskRunner runner.
    3. 通过 runner PostTask, 这些任务是期望在并发消息队列上运行的, 且不能保证/指定在特定线程.
    4. 使用完成, ConcurrentTaskRunner 的析构函数退出线程.

    基于上述最简单的需求和用法, 让我们考虑如何设计这两个类.
    简单的思想就是利用 mutex 保护 queue (管理任务) 和 condition_variable(同步线程),

    
    class ConcurrentMessageLoop : public std::enable_shared_from_this<ConcurrentMessageLoop>{
     public:
      static std::shared_ptr<ConcurrentMessageLoop> Create(
        size_t worker_count = std::thread::hardware_concurrency());// 1
    
      ~ConcurrentMessageLoop();// 1
    
      // Disallow copy and assignment.
    
      void PostTask(const fml::closure& task);// 2
     private:
      void WorkerMain();// 2
    
      size_t worker_count_ = 0;
      bool shutdown_ = false;
      std::vector<std::thread> workers_;
    
      std::mutex tasks_mutex_;
      std::condition_variable tasks_condition_;
      std::queue<fml::closure> tasks_;
    
      ConcurrentMessageLoop(size_t worker_count); // 1
    }
    

    并发消息队列需要考虑什么呢?

    1. 如何创建自己, 开始和销毁线程.
    2. 任务队列 如何维护? 工作线程如何获取这些任务并运行?

    生命周期

    我们的类继承std::enable_shared_from_this<T>, 并提供Create创建, 该方式能保证在同一个对象上创建多个std::shared_ptr时引用计数的正确性.

    启动: 创建n个线程, 并让他们跑WorkerMain

    销毁: 将shutdown_设置为true, 条件变量tasks_condition_唤醒所有的等待线程.

    任务队列

    
    // worker 线程
    void WorkerMain() {
      while (true) {
        std::unique_lock lock(tasks_mutex_);
        tasks_condition_.wait(lock, [&]() {
          return tasks_.size() > 0 || shutdown_;
        });
    
        // 此时已经上锁, 获取 shutdown flag 和 task.
        bool shutdown_now = shutdown_;
        fml::closure task;
    
        if (tasks_.size() != 0) {
          task = tasks_.front();
          tasks_.pop();
        }
    
        // 运行 task 前, 提前解锁
        lock.unlock();
    
        if (task) {
          task();
        }
        // 线程退出
        if (shutdown_now) {
          break;
        }
      }
    }
    
    // 加入task
    void PostTask(const fml::closure& task) {
      if (!task) {
        return;
      }
    
      std::unique_lock lock(tasks_mutex_);
    
      // 已经退出线程,解锁后在主线程直接运行
      if (shutdown_) {
        lock.unlock();
        task();
        return;
      }
      // 加入到任务队列.
      tasks_.push(task);
    
      // condition variable 的 mutex 必须在 notify 之前解锁, 否则无法唤醒wait的线程
      lock.unlock();
      // 唤醒一个等待的worker线程
      tasks_condition_.notify_one();
    }
    
    

    flutter 中,PostTask接口由TaskRunner暴露, 但需要保持对消息队列的引用.

    拓展

    这就是一个基础的并发消息队列实现了, 我们可以考虑根据场景增加额外功能, 从而演示如何更新消息队列的功能.

    PostTaskToAll

    flutter里额外有PostTaskToAll功能, 即向所有线程都发送一个任务, 为了实现该功能, 添加字段

    std::vector<std::thread::id> worker_thread_ids_;                    // 线程id
    std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_; // 线程对应的任务队列
    

    用于检索线程及线程特定task queue.

    发送任务时为所有线程的 task queue 都加入task,并唤醒所有线程, 在WorkerMain中, 查看当前线程是否有任务, 如果有则取出并运行.

    这里存在的选择是, 你可以为thread_tasks_添加单独的 mutex , 这样会有更高程度的并发, 或复用tasks_mutex_, 这样代码更易控制,减少出错概率

    动态增加/退出线程

    考虑如下场景: 并发任务量在某时段徒增, 之后又下降到较低水平, 可以使用动态创建线程的方式来响应.

    当任务队列的任务数量超过某个阈值时, 创建新的线程来消费任务. 当任务被消费完后, 退出新增的线程.(用户指定的线程数量是固定的) 添加字段和函数

    std::atomic<bool> shutdown_extra_ = false;  // 新增线程的退出标记
    std::mutex extra_worker_mutex_;             // extra_workers_ 的 mutex
    std::vector<std::thread> extra_workers_;    // 存储新增的线程.
    
    void TrySpawnWorker();                      // 如果还能新增线程, 则新增一个额外线程
    void PureWorkers();                         // 退出新增的线程
    

    PostTask中, 根据任务的数量进行调用

    if (current_task_count == 1) {
      PureWorkers();
    } else if (current_task_count >= kTaskThresholds) {
      TrySpawnWorker();
    }
    

    最后ExtraWorkerMain在原来WorkerMain基础上再等待shutdown_extra_即可.

  • 相关阅读:
    MySQL笔记(6)---锁
    MySQL笔记(5)---索引与算法
    MySQL笔记(4)---表
    MySQL笔记(3)---文件
    MySQL笔记(2)---InnoDB存储引擎
    MySQL笔记(1)---MySQL体系结构和存储引擎
    生成器,迭代器,装饰器
    文件操作、def函数、模块导入、json
    数据类型、字符串操作
    基本数据类型,条件判断
  • 原文地址:https://www.cnblogs.com/xxrlz/p/15451375.html
Copyright © 2011-2022 走看看