废话不多说,请见注释
1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3 4 #include <vector> 5 #include <queue> 6 #include <memory> 7 #include <thread> 8 #include <mutex> 9 #include <condition_variable> 10 #include <future> 11 #include <functional> 12 #include <stdexcept> 13 14 class ThreadPool { 15 public: 16 explicit ThreadPool(size_t); 17 template<class F, class... Args>//可变参数模版 18 //值得注意的是这里F&&表示universal reference而不是右值引用 19 //如果存在推断类型如template或auto那么&&即表示universal reference,具体是左值引用还是右值引用由初始化决定 20 auto enqueue(F&& f, Args&&... args)//f是函数名,args是参数 21 ->std::future<decltype(f(args...))>;//尾置返回类型,返回 函数f返回值类型的future 22 ~ThreadPool(); 23 private: 24 // need to keep track of threads so we can join them 25 std::vector< std::thread > workers; 26 // the task queue 27 std::queue< std::function<void()> > tasks;//std::function通用的函数封装,要求一个返回值类型为void的无参函数 28 29 // synchronization 30 std::mutex queue_mutex;//锁,负责保护任务队列和stop 31 std::condition_variable condition;//条件变量 32 bool stop; 33 }; 34 35 // the constructor just launches some amount of workers 36 inline ThreadPool::ThreadPool(size_t threads)//构造时设定线程数量 37 : stop(false) 38 { 39 for(size_t i = 0;i<threads;++i) 40 workers.emplace_back(//push_back的优化版本 41 [this]//lambda表达式捕获this指针 42 { 43 for(;;)//比while(1)更优 44 { 45 std::function<void()> task; 46 {//{}内相当于新的作用域 47 std::unique_lock<std::mutex> lock(this->queue_mutex); 48 //在等待任务队列中出现任务的过程中解锁queue_mutex 49 //由notify_one或notify_all唤醒 50 //线程池初始化后将有threads个线程在此处等待,每个线程执行完分配到的任务将执行循环,再取任务执行或等待任务加入队列 51 /* 我们需要知道这么做的目的是,std::thread本身仅能绑定一个函数,而我们需要仅用threads个线程去帮我们执行m个任务, 52 * 而不是每执行一个任务创建一个线程,如果这样我们将创建m个线程,而创建线程是需要开销的,这引起了不必要的浪费,线程池就是为此而生的 53 * 通过这种方式,每个std::thread仍然是只绑定了一个函数,但是这一个函数会执行我们想要的多个任务 54 */ 55 this->condition.wait(lock, 56 [this]{ return this->stop || !this->tasks.empty(); }); 57 if(this->stop && this->tasks.empty())//stop=true,仍需执行任务队列中剩余任务 58 return; 59 task = std::move(this->tasks.front());//std::move避免拷贝 60 this->tasks.pop(); 61 } 62 63 task();//执行任务 64 } 65 } 66 ); 67 } 68 // add new work item to the pool 69 template<class F, class... Args> 70 auto ThreadPool::enqueue(F&& f, Args&&... args) 71 -> std::future<decltype(f(args...))> 72 { 73 using return_type = decltype(f(args...)); 74 75 //基本类型是std::shared_ptr,指向类型是std::packaged_task,类型是返回值类型为return_type的无参函数 76 auto task = std::make_shared< std::packaged_task<return_type()> >( 77 /* 现在该说说为什么std::packaged_task的类型是一个返回值为return_type的无参数函数了 78 * 返回值是return_type这没有问题,至于参数消失的原因是因为:std::bind 79 * 在这里它创建了一个无参数(参数均被指定)版本的函数f 80 */ 81 //std::forward配合universal reference使用,完美转发,实际效果是如果是右值引用那么还是右值引用,如果是左值引用那么还是左值引用 82 std::bind(std::forward<F>(f), std::forward<Args>(args)...) 83 ); 84 85 std::future<return_type> res = task->get_future();//任务函数实际执行后的返回值 86 { 87 std::unique_lock<std::mutex> lock(queue_mutex); 88 89 // don't allow enqueueing after stopping the pool 90 if(stop) 91 throw std::runtime_error("enqueue on stopped ThreadPool"); 92 93 tasks.emplace([task](){ (*task)(); });//往tasks队列压入一个无参无返回值的函数,函数体内调用task(不要忘记task是shared_ptr类型) 94 } 95 //任务压入队列,唤醒等待的线程 96 condition.notify_one(); 97 return res; 98 } 99 100 // the destructor joins all threads 101 inline ThreadPool::~ThreadPool() 102 { 103 { 104 std::unique_lock<std::mutex> lock(queue_mutex); 105 stop = true; 106 } 107 condition.notify_all();//唤醒所有等待的进程 108 for(std::thread &worker: workers) 109 worker.join();//等待所有线程结束 110 } 111 112 #endif