zoukankan      html  css  js  c++  java
  • 基于C++11的线程池

    使用方法:

     1 #include "threadpool.h"
     2 #include <iostream>
     3 #include <windows.h>
     4 
     5 using namespace std;
     6 
     7 void fun1(int slp)
     8 {
     9     printf("  hello, fun1 !  %d
    ", this_thread::get_id());
    10     if (slp > 0) {
    11         printf(" ======= fun1 sleep %d  =========  %d
    ", slp, this_thread::get_id());
    12         this_thread::sleep_for(chrono::milliseconds(slp));
    13         //Sleep(slp );
    14     }
    15 }
    16 
    17 struct gfun {
    18     int operator()(int n) {
    19         printf("%d  hello, gfun !  %d
    ", n, this_thread::get_id());
    20         return 42;
    21     }
    22 };
    23 
    24 class A {    //函数必须是 static 的才能使用线程池
    25 public:
    26     static int Afun(int n = 0) {
    27         cout << n << "  hello, Afun !  " << this_thread::get_id() << endl;
    28         return n;
    29     }
    30 
    31     static string Bfun(int n, string str, char c) {
    32         cout << n << "  hello, Bfun !  " << str.c_str() << "  " << (int)c << "  " << this_thread::get_id() << endl;
    33         return str;
    34     }
    35 };
    36 
    37 int main()
    38 try {
    39     threadpool executor{ 50 };
    40     A a;
    41     future<void> ff = executor.commit(fun1, 0);
    42     future<int> fg = executor.commit(gfun{}, 0);
    43     future<int> gg = executor.commit(a.Afun, 9999); //IDE提示错误,但可以编译运行
    44     future<string> gh = executor.commit(A::Bfun, 9998, "mult args", 123);
    45     future<string> fh = executor.commit([]()->string { cout << "hello, fh !  " << this_thread::get_id() << endl; return "hello,fh ret !"; });
    46 
    47     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
    48     this_thread::sleep_for(chrono::microseconds(900));
    49 
    50     for (int i = 0; i < 50; i++) {
    51         executor.commit(fun1, i * 100);
    52     }
    53     cout << " =======  commit all ========= " << this_thread::get_id() << " idlsize=" << executor.idlCount() << endl;
    54 
    55     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
    56     this_thread::sleep_for(chrono::seconds(3));
    57 
    58     ff.get(); //调用.get()获取返回值会等待线程执行完,获取返回值
    59     cout << fg.get() << "  " << fh.get().c_str() << "  " << this_thread::get_id() << endl;
    60 
    61     cout << " =======  sleep ========= " << this_thread::get_id() << endl;
    62     this_thread::sleep_for(chrono::seconds(3));
    63 
    64     cout << " =======  fun1,55 ========= " << this_thread::get_id() << endl;
    65     executor.commit(fun1, 55).get();    //调用.get()获取返回值会等待线程执行完
    66 
    67     cout << "end... " << this_thread::get_id() << endl;
    68 
    69 
    70     threadpool pool(4);
    71     vector< future<int> > results;
    72 
    73     for (int i = 0; i < 8; ++i) {
    74         results.emplace_back(
    75             pool.commit([i] {
    76                 cout << "hello " << i << endl;
    77                 this_thread::sleep_for(chrono::seconds(1));
    78                 cout << "world " << i << endl;
    79                 return i * i;
    80                 })
    81         );
    82     }
    83     cout << " =======  commit all2 ========= " << this_thread::get_id() << endl;
    84 
    85     for (auto&& result : results)
    86         cout << result.get() << ' ';
    87     cout << endl;
    88     return 0;
    89 }
    90 catch (exception& e) {
    91     cout << "some unhappy happened...  " << this_thread::get_id() << e.what() << endl;
    92 }
      1 #pragma once
      2 #ifndef THREAD_POOL_H
      3 #define THREAD_POOL_H
      4 
      5 #include <vector>
      6 #include <queue>
      7 #include <atomic>
      8 #include <future>
      9 //#include <condition_variable>
     10 //#include <thread>
     11 //#include <functional>
     12 #include <stdexcept>
     13 
     14 namespace std
     15 {
     16     //线程池最大容量,应尽量设小一点
     17 #define  THREADPOOL_MAX_NUM 16
     18 //#define  THREADPOOL_AUTO_GROW
     19 
     20 //线程池,可以提交变参函数或拉姆达表达式的匿名函数执行,可以获取执行返回值
     21 //不直接支持类成员函数, 支持类静态成员函数或全局函数,Opteron()函数等
     22     class threadpool
     23     {
     24         using Task = function<void()>;    //定义类型
     25         vector<thread> _pool;            //线程池
     26         queue<Task> _tasks;                //任务队列
     27         mutex _lock;                    //同步
     28         condition_variable _task_cv;    //条件阻塞
     29         atomic<bool> _run{ true };        //线程池是否执行
     30         atomic<int>  _idlThrNum{ 0 };    //空闲线程数量
     31 
     32     public:
     33         inline threadpool(unsigned short size = 4) { addThread(size); }
     34         inline ~threadpool()
     35         {
     36             _run = false;
     37             _task_cv.notify_all(); // 唤醒所有线程执行
     38             for (thread& thread : _pool) {
     39                 //thread.detach(); // 让线程“自生自灭”
     40                 if (thread.joinable())
     41                     thread.join(); // 等待任务结束, 前提:线程一定会执行完
     42             }
     43         }
     44 
     45     public:
     46         // 提交一个任务
     47         // 调用.get()获取返回值会等待任务执行完,获取返回值
     48         // 有两种方法可以实现调用类成员,
     49         // 一种是使用   bind: .commit(std::bind(&Dog::sayHello, &dog));
     50         // 一种是用   mem_fn: .commit(std::mem_fn(&Dog::sayHello), this)
     51         template<class F, class... Args>
     52         auto commit(F&& f, Args&&... args) ->future<decltype(f(args...))>
     53         {
     54             if (!_run)    // stoped ??
     55                 throw runtime_error("commit on ThreadPool is stopped.");
     56 
     57             using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
     58             auto task = make_shared<packaged_task<RetType()>>(
     59                 bind(forward<F>(f), forward<Args>(args)...)
     60                 ); // 把函数入口及参数,打包(绑定)
     61             future<RetType> future = task->get_future();
     62             {    // 添加任务到队列
     63                 lock_guard<mutex> lock{ _lock };//对当前块的语句加锁  lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
     64                 _tasks.emplace([task]() { // push(Task{...}) 放到队列后面
     65                     (*task)();
     66                     });
     67             }
     68 #ifdef THREADPOOL_AUTO_GROW
     69             if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
     70                 addThread(1);
     71 #endif // !THREADPOOL_AUTO_GROW
     72             _task_cv.notify_one(); // 唤醒一个线程执行
     73 
     74             return future;
     75         }
     76 
     77         //空闲线程数量
     78         int idlCount() { return _idlThrNum; }
     79         //线程数量
     80         int thrCount() { return _pool.size(); }
     81 #ifndef THREADPOOL_AUTO_GROW
     82     private:
     83 #endif // !THREADPOOL_AUTO_GROW
     84         //添加指定数量的线程
     85         void addThread(unsigned short size)
     86         {
     87             for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
     88             {   //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM
     89                 _pool.emplace_back([this] { //工作线程函数
     90                     while (_run)
     91                     {
     92                         Task task; // 获取一个待执行的 task
     93                         {
     94                             // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
     95                             unique_lock<mutex> lock{ _lock };
     96                             _task_cv.wait(lock, [this] {
     97                                 return !_run || !_tasks.empty();
     98                                 }); // wait 直到有 task
     99                             if (!_run && _tasks.empty())
    100                                 return;
    101                             task = move(_tasks.front()); // 按先进先出从队列取一个 task
    102                             _tasks.pop();
    103                         }
    104                         _idlThrNum--;
    105                         task();//执行任务
    106                         _idlThrNum++;
    107                     }
    108                     });
    109                 _idlThrNum++;
    110             }
    111         }
    112     };
    113 
    114 }
    115 
    116 #endif  
  • 相关阅读:
    Linux系统目录数和文件数限制
    用十条命令在一分钟内检查Linux服务器性能
    Linux 性能
    vmstat命令
    利用Clonezilla备份还原Linux系统 (转载别人的知识)
    性能,并发,压力--别人所写
    linux -top 命令
    Linux 随写
    接口测试
    Jmeter关联正则表达式提取器
  • 原文地址:https://www.cnblogs.com/Optimals/p/14630692.html
Copyright © 2011-2022 走看看