简介
线程池(thread pool):一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的组成
1、线程池管理器
创建一定数量的线程,启动线程,调配任务,管理着线程池。
本篇线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask).
start()创建一定数量的线程池,进行线程循环.
stop()停止所有线程循环,回收所有资源.
addTask()添加任务.
2、工作线程
线程池中线程,在线程池中等待并执行分配的任务.
本篇选用条件变量实现等待与通知机制.
3、任务接口,
添加任务的接口,以供工作线程调度任务的执行。
4、任务队列
用于存放没有处理的任务。提供一种缓冲机制
同时任务队列具有调度功能,高优先级的任务放在任务队列前面;本篇选用priority_queue 与pair的结合用作任务优先队列的结构.
代码实现:
ThreadPool.hpp:
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <functional> #include <vector> #include <queue> class ThreadPool { public: using Task = std::function<void()>; explicit ThreadPool(int num) : _thread_num(num), _is_running(false) {} ~ThreadPool() { if (_is_running) stop(); } void start() { _is_running = true; // start threads for (int i = 0; i < _thread_num; i++) _threads.emplace_back(std::thread(&ThreadPool::work, this)); } void stop() { { // stop thread pool, should notify all threads to wake std::unique_lock<std::mutex> lk(_mtx); _is_running = false; _cond.notify_all(); // must do this to avoid thread block } // terminate every thread job for (std::thread& t : _threads) { if (t.joinable()) t.join(); } } void appendTask(const Task& task) { if (_is_running) { std::unique_lock<std::mutex> lk(_mtx); _tasks.push(task); _cond.notify_one(); // wake a thread to to the task } } private: void work() { printf("begin work thread: %d ", std::this_thread::get_id()); // every thread will compete to pick up task from the queue to do the task while (_is_running) { Task task; { std::unique_lock<std::mutex> lk(_mtx); if (!_tasks.empty()) { // if tasks not empty, // must finish the task whether thread pool is running or not task = _tasks.front(); _tasks.pop(); // remove the task } else if (_is_running && _tasks.empty()) _cond.wait(lk); } if (task) task(); // do the task } printf("end work thread: %d ", std::this_thread::get_id()); } public: // disable copy and assign construct ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool& other) = delete; private: bool _is_running; // thread pool manager status std::mutex _mtx; std::condition_variable _cond; int _thread_num; std::vector<std::thread> _threads; std::queue<Task> _tasks; }; #endif // !_THREAD_POOL_H_
main.cpp
#include "stdafx.h" #include <iostream> #include <chrono> #include "ThreadPool.hpp" void fun1() { std::cout << "working in thread " << std::this_thread::get_id() << std::endl; } void fun2(int x) { std::cout << "task " << x << " working in thread " << std::this_thread::get_id() << std::endl; } int main(int argc, char* argv[]) { ThreadPool thread_pool(3); thread_pool.start(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (int i = 0; i < 6; i++) { //thread_pool.appendTask(fun1); thread_pool.appendTask(std::bind(fun2, i)); //std::this_thread::sleep_for(std::chrono::milliseconds(500)); } thread_pool.stop(); getchar(); return 0; }
线程池偏底层实现原理
#ifndef _THREADPOOL_H #define _THREADPOOL_H #define LL_ADD(item,list) do { item->prev = NULL; item->next = list; if (list != nullptr) { list->prev = item; } }while (0) #define LL_REMOVE(item,list) do { if (item->prev != nullptr) { item->prev - next = item->next; } if (item->next != nullptr) { item->next->prev = item->prev; } if (item == item) { list = item->next; } }while (0) struct NWORKER { pthread_t thread; struct NMANAGER *pool; int terminate; struct NWORKER *prev; struct NWORKER *next; }; struct NJOB { void(*func)(struct NJOB *job); void *user_data; struct NJOB *prev; struct NJOB *next; }; struct NMANAGER { struct Nworder *workers; struct NJOB *jobs; pthread_cond_t jobs_cond; pthread_mutex_t jobs_mutex; }; typedef struct NMANGER nThreadPool; static void *nThreadCallback(void *arg) { struct NWORKER *worker = (struct NWORKER*)arg; while (1) { pthread_mutex_lock(&worker->pool->jobs_mutex); while (worker->pool->jobs == NULL) { if (worker->terminate)break; pthread_cond_wait(work->pool->jobs_cond, worker->pool->jobs_mutex); } if (worker->terminate) { pthread_mutex_unlock(&worker->pool->jobs_mutex); break; } struct NJOB *job = worker->pool->jobs; LL_REMOVE(job, worker->pool->jobs); pthread_mutex_unlock(&worker->pool->jobs_mutex); job->func(job->user_data); } free(worker); pthread_exit(NULL); } int nThreadPoolCreate(nThreadPool *pool, int numWorkers) { if (numWorkers < 1) numWorkers = 1; memset(pool, 0, sizeof(nThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t)); for (int i = 0; i < numWorkers; i++) { struct NWORKER *worker = (struct NWORKER*)malloc(sizeof(NWORKER)); if (worker != nullptr) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct NWORKER)); worker->pool = pool; int ret = pthread_create(worker->thread, NULL, nThreadCallback, worker); if (ret) { perror("pthread_create"); free(worker); retrun - 3; } LL_ADD(worker, pool->workers); } } int nThreadPoolDestroy(nThreadPool *pool) { struct NWORKER* worker = NULL; for (worker = pool->workers; worker != NULL; worker = pool->next) { worker->teminate = 1; } pthread_mutex_lock(&pool->jobs_mutex); pthread_cond_broadcast(pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mutex); } void nThreadPoolPush(nThreadPool *pool, struct NJOB *job) { pthread_mutex_lock(&pool->jobs_mutex); LL_ADD(job, pool->jobs); pthread_cond_signal(&pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mutex); } #endif