main.cpp
#include <iostream> #include <vector> #include <chrono> #include <functional> #include "ThreadPool.h" #include <omp.h> class CA { int a = 1; int b = 2; public: int Max(int a, int b) { return a > b ? a : b; } #if 0 double Max(int a) { return a; } #endif int Sum(int a, int b) { return a + b; } }; int main() { ThreadPool pool(4); std::vector< std::future<int> > results; for (int i = 0; i < 16; ++i) { results.emplace_back( pool.enqueue([i] { // std::cout << "hello " << i << std::endl; // std::this_thread::sleep_for(std::chrono::seconds(1)); return i*i; }) ); } for (auto && result : results){ //std::cout << result.get() << ' '; std::cout <<"sync"<< ' '; }//同步:要等这里结束,才能往下执行, std::cout << "~~main~~" << std::endl; //成员函数不能有重名的(重载)。 CA testA; //mem_fn std::future<int> fe = pool.enqueue(std::mem_fn(&CA::Max), testA, 3, 99999); //bind using namespace std::placeholders; std::future<int> fl = pool.enqueue(std::bind(&CA::Sum,&testA,_1,_2),3,99); std::cout << fl.get() << std::endl; std::cout << fe.get() << std::endl; return 0; }
ThreadPool.h
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <stdio.h> #include <chrono> // std::chrono::seconds //线程池线程空闲自动退出时间间隔 ,5分钟 const int THREAD_WAIT_TIME_OUT = 10; //10 s class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) ->//std::future<typename std::result_of<F(Args...)>::type>; //std::future < decltype(f(std::forward<Args>(args...))) > ; std::future < decltype(f(args...)) > ; void threadFun(); ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i<threads; ++i) workers.emplace_back(&ThreadPool::threadFun, this); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) ->//std::future<decltype(f(std::forward<Args>(args...)))> //std::future<typename std::result_of<F(Args...)>::type> std::future<decltype(f(args...))> { // using return_type = typename std::result_of<F(Args...)>::type; using return_type = decltype(f(args...)); auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]{ (*task)(); }); //tasks.emplace([task]{ std::move(*task)(); }); } condition.notify_one(); return res; } void ThreadPool::threadFun() { for (;;) { std::function<void()> task; { //std::unique_lock<std::mutex> lock(this->queue_mutex); std::unique_lock<std::mutex> lock(queue_mutex); //当条件为假的时候,才阻塞 //同时为假才阻塞,stop为false ,并且任务为空 this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) { printf("@@--%d--@@ ", std::this_thread::get_id()); if (this->condition.wait_for(lock, std::chrono::seconds(THREAD_WAIT_TIME_OUT)) == std::cv_status::timeout) // printf("@@--%d--@@ ",std::this_thread::get_id()); break; else continue; } task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } #endif
g++ main.cpp -std=c++11 -pthread
ubuntu@ubuntu-vm:~/workspace/thread pool$ ./a.out sync sync sync sync sync sync sync sync sync sync sync sync sync sync sync sync ~~main~~ 102 99999 @@--1444861696--@@ @@--1436468992--@@ @@--1453254400--@@ @@--1461647104--@@