concurrent message loop
本文首先介绍 fml::ConcurrentMessageLoop , 并后续分析更多 fml 中的并发基础架构
ConcurrentMessageLoop 和普通的 MessageLoop 功能类似, 且接口有所简化, 使用流程为
- 创建 ConcurrentMessageLoop
- 从并发消息队列获取 ConcurrentTaskRunner runner.
- 通过 runner PostTask, 这些任务是期望在并发消息队列上运行的, 且不能保证/指定在特定线程.
- 使用完成, ConcurrentTaskRunner 的析构函数退出线程.
基于上述最简单的需求和用法, 让我们考虑如何设计这两个类.
简单的思想就是利用 mutex 保护 queue (管理任务) 和 condition_variable(同步线程),
class ConcurrentMessageLoop : public std::enable_shared_from_this<ConcurrentMessageLoop>{
public:
static std::shared_ptr<ConcurrentMessageLoop> Create(
size_t worker_count = std::thread::hardware_concurrency());// 1
~ConcurrentMessageLoop();// 1
// Disallow copy and assignment.
void PostTask(const fml::closure& task);// 2
private:
void WorkerMain();// 2
size_t worker_count_ = 0;
bool shutdown_ = false;
std::vector<std::thread> workers_;
std::mutex tasks_mutex_;
std::condition_variable tasks_condition_;
std::queue<fml::closure> tasks_;
ConcurrentMessageLoop(size_t worker_count); // 1
}
并发消息队列需要考虑什么呢?
- 如何创建自己, 开始和销毁线程.
- 任务队列 如何维护? 工作线程如何获取这些任务并运行?
生命周期
我们的类继承std::enable_shared_from_this<T>
, 并提供Create创建, 该方式能保证在同一个对象上创建多个std::shared_ptr
时引用计数的正确性.
启动: 创建n个线程, 并让他们跑WorkerMain
销毁: 将shutdown_
设置为true, 条件变量tasks_condition_唤醒所有的等待线程.
任务队列
// worker 线程
void WorkerMain() {
while (true) {
std::unique_lock lock(tasks_mutex_);
tasks_condition_.wait(lock, [&]() {
return tasks_.size() > 0 || shutdown_;
});
// 此时已经上锁, 获取 shutdown flag 和 task.
bool shutdown_now = shutdown_;
fml::closure task;
if (tasks_.size() != 0) {
task = tasks_.front();
tasks_.pop();
}
// 运行 task 前, 提前解锁
lock.unlock();
if (task) {
task();
}
// 线程退出
if (shutdown_now) {
break;
}
}
}
// 加入task
void PostTask(const fml::closure& task) {
if (!task) {
return;
}
std::unique_lock lock(tasks_mutex_);
// 已经退出线程,解锁后在主线程直接运行
if (shutdown_) {
lock.unlock();
task();
return;
}
// 加入到任务队列.
tasks_.push(task);
// condition variable 的 mutex 必须在 notify 之前解锁, 否则无法唤醒wait的线程
lock.unlock();
// 唤醒一个等待的worker线程
tasks_condition_.notify_one();
}
flutter 中,PostTask接口由TaskRunner暴露, 但需要保持对消息队列的引用.
拓展
这就是一个基础的并发消息队列实现了, 我们可以考虑根据场景增加额外功能, 从而演示如何更新消息队列的功能.
PostTaskToAll
flutter里额外有PostTaskToAll
功能, 即向所有线程都发送一个任务, 为了实现该功能, 添加字段
std::vector<std::thread::id> worker_thread_ids_; // 线程id
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_; // 线程对应的任务队列
用于检索线程及线程特定task queue.
发送任务时为所有线程的 task queue 都加入task,并唤醒所有线程, 在WorkerMain
中, 查看当前线程是否有任务, 如果有则取出并运行.
这里存在的选择是, 你可以为thread_tasks_
添加单独的 mutex , 这样会有更高程度的并发, 或复用tasks_mutex_
, 这样代码更易控制,减少出错概率
动态增加/退出线程
考虑如下场景: 并发任务量在某时段徒增, 之后又下降到较低水平, 可以使用动态创建线程的方式来响应.
当任务队列的任务数量超过某个阈值时, 创建新的线程来消费任务. 当任务被消费完后, 退出新增的线程.(用户指定的线程数量是固定的) 添加字段和函数
std::atomic<bool> shutdown_extra_ = false; // 新增线程的退出标记
std::mutex extra_worker_mutex_; // extra_workers_ 的 mutex
std::vector<std::thread> extra_workers_; // 存储新增的线程.
void TrySpawnWorker(); // 如果还能新增线程, 则新增一个额外线程
void PureWorkers(); // 退出新增的线程
在PostTask
中, 根据任务的数量进行调用
if (current_task_count == 1) {
PureWorkers();
} else if (current_task_count >= kTaskThresholds) {
TrySpawnWorker();
}
最后ExtraWorkerMain
在原来WorkerMain
基础上再等待shutdown_extra_
即可.