Task 和 Thread_pool
class Task 任务类
class Thread_pool 线程池
线程池:一定数量的线程集合。 用于执行task(任务,可以简单理解为函数)。执行过程中,task 被插入任务队列task_queue,线程池根据插入顺序依次执行。task之间可能有依赖关系,如task_b依赖于task_a。在,task依赖没执行完时,task不能执行,因此task有新建、调配、依赖执行完成、执行、执行完成等多个状态。包括:
NEW:新建任务,还未schedule到线程池。
DISPATCHED: 任务已经schedule 到线程池。
DEPENDENCIES_COMPLETED: 任务依赖已经执行完成。
RUNNING: 任务执行中。
COMPLETED: 任务完成。
对任一个任务的状态转换顺序为:
NEW->DISPATCHED->DEPENDENCIES_COMPLETED->RUNNING->COMPLETED
Task 和 Thread_pool 关系
-
新的Task 通过Thread_pool -> Schedule 部署到 线程池Thread_pool 的 tasks_not_ready_队列中。 当该Task没有依赖,直接插入task_queue,准备执行,否则,等待DEPENDENCIES_COMPLETED
-
Thread_pool 通过固定数量的thread 与 task_queue(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。
-
在执行Task过程中,tasks_not_ready_中的Task状态不断变化,一旦变为DEPENDENCIES_COMPLETED就插入到task_queue中。最终所有Task都会插入task_queue中,得到执行。
-
整个Task状态变化过程 NEW->DISPATCHED->DEPENDENCIES_COMPLETED->RUNNING->COMPLETED
当变为DEPENDENCIES_COMPLETED,即从tasks_not_ready_转移到task_queue,准备执行。
class Task {
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
void SetWorkItem(const WorkItem& work_item); //设置Task 执行的任务 (函数)
// 给当前任务添加 依赖任务,如当前任务为b,添加依赖任务为a(a——>b: b.AddDependency(a))
// 同时并把当前任务b,加入到依赖任务a的dependent_tasks_列表中,以便执行a后,对应更改b的状态)。
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);
private:
// AddDependency功能具体实现函数
// 添加依赖本Task的Task,如b依赖a,则a-->b, a.AddDependentTask(b), 根据a的状态,改变b的状态
// 如果a完成,则b的依赖-1;(并把当前任务b,加入到依赖任务a的dependent_tasks_列表中,以便执行a后,对应更改b的状态)。
void AddDependentTask(Task* dependent_task);
// 执行当前任务,比如当前任务为a,并依此更新依赖a的任务dependent_tasks_中所有任务状态,如依赖a的b。
void Execute() LOCKS_EXCLUDED(mutex_);
// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);
// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();
WorkItem work_item_ ;// 任务具体执行过程
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;// 执行当前任务的线程池
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
unsigned int uncompleted_dependencies_ GUARDED_BY(mutex_) = 0; //当前任务依赖的任务的数量
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);// 依赖当前任务的任务列表
absl::Mutex mutex_;
};
class ThreadPool : public ThreadPoolInterface {
public:
explicit ThreadPool(int num_threads);//初始化一个线程数量固定的线程池。
// When the returned weak pointer is expired, 'task' has certainly completed,
// so dependants no longer need to add it as a dependency.
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task) //添加想要ThreadPool执行的task,
// 插入tasks_not_ready_,如果任务满足执行要求,直接插入task_queue_准备执行
LOCKS_EXCLUDED(mutex_) override;
private:
void DoWork();//每个线程初始化时,执行DoWork()函数. 与线程绑定
void NotifyDependenciesCompleted(Task* task) LOCKS_EXCLUDED(mutex_) override;
bool running_ GUARDED_BY(mutex_) = true;//running_只是一个监视哨,只有线程池在running_状态时,才能往work_queue_加入函数.
std::vector<std::thread> pool_ GUARDED_BY(mutex_);
std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY(mutex_); // 准备执行的task
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_ //未准备好的 task,task可能有依赖还未完成
GUARDED_BY(mutex_);
};