zoukankan      html  css  js  c++  java
  • c++11线程池实现

    咳咳。c++11 增加了线程库,从此告别了标准库不支持并发的历史。

    然而 c++ 对于多线程的支持还是比較低级,略微高级一点的使用方法都须要自己去实现,譬如线程池、信号量等。

    线程池(thread pool)这个东西。在面试上多次被问到,一般的回答都是:“管理一个任务队列。一个线程队列,然后每次取一个任务分配给一个线程去做。循环往复。” 貌似没有问题吧。

    可是写起程序来的时候就出问题了。

    废话不多说,先上实现。然后再啰嗦。(dont talk, show me ur code !)

    1. #ifndef ILOVERS_THREAD_POOL_H
    2. #define ILOVERS_THREAD_POOL_H
    3.  
    4. #include <iostream>
    5. #include <functional>
    6. #include <thread>
    7. #include <condition_variable>
    8. #include <future>
    9. #include <atomic>
    10. #include <vector>
    11. #include <queue>
    12.  
    13. // 命名空间
    14. namespace ilovers {
    15. class TaskExecutor;
    16. }
    17.  
    18. class ilovers::TaskExecutor{
    19. using Task = std::function<void()>;
    20. private:
    21. // 线程池
    22. std::vector<std::thread> pool;
    23. // 任务队列
    24. std::queue<Task> tasks;
    25. // 同步
    26. std::mutex m_task;
    27. std::condition_variable cv_task;
    28. // 是否关闭提交
    29. std::atomic<bool> stop;
    30. public:
    31. // 构造
    32. TaskExecutor(size_t size = 4): stop {false}{
    33. size = size < 1 ? 1 : size;
    34. for(size_t i = 0; i< size; ++i){
    35. pool.emplace_back(&TaskExecutor::schedual, this); // push_back(std::thread{...})
    36. }
    37. }
    38. // 析构
    39. ~TaskExecutor(){
    40. for(std::thread& thread : pool){
    41. thread.detach(); // 让线程“自生自灭”
    42. //thread.join(); // 等待任务结束。 前提:线程一定会运行完
    43. }
    44. }
    45. // 停止任务提交
    46. void shutdown(){
    47. this->stop.store(true);
    48. }
    49. // 重新启动任务提交
    50. void restart(){
    51. this->stop.store(false);
    52. }
    53. // 提交一个任务
    54. template<class F, class... Args>
    55. auto commit(F&& f, Args&&... args) ->std::future<decltype(f(args...))> {
    56. if(stop.load()){ // stop == true ??

    57. throw std::runtime_error("task executor have closed commit.");
    58. }
    59. using ResType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
    60. auto task = std::make_shared<std::packaged_task<ResType()>>(
    61. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    62. ); // wtf !
    63. { // 加入任务到队列
    64. std::lock_guard<std::mutex> lock {m_task};
    65. tasks.emplace([task](){ // push(Task{...})
    66. (*task)();
    67. });
    68. }
    69. cv_task.notify_all(); // 唤醒线程运行
    70. std::future<ResType> future = task->get_future();
    71. return future;
    72. }
    73. private:
    74. // 获取一个待运行的 task
    75. Task get_one_task(){
    76. std::unique_lock<std::mutex> lock {m_task};
    77. cv_task.wait(lock, [this](){ return !tasks.empty(); }); // wait 直到有 task
    78. Task task {std::move(tasks.front())}; // 取一个 task
    79. tasks.pop();
    80. return task;
    81. }
    82. // 任务调度
    83. void schedual(){
    84. while(true){
    85. if(Task task = get_one_task()){
    86. task(); //
    87. }else{
    88. // return; // done
    89. }
    90. }
    91. }
    92. };
    93.  
    94. #endif
    95.  
    96. void f()
    97. {
    98. std::cout << "hello, f !" << std::endl;
    99. }
    100.  
    101. struct G{
    102. int operator()(){
    103. std::cout << "hello, g !" << std::endl;
    104. return 42;
    105. }
    106. };
    107.  
    108.  
    109. int main()
    110. try{
    111. ilovers::TaskExecutor executor {10};
    112. std::future<void> ff = executor.commit(f);
    113. std::future<int> fg = executor.commit(G{});
    114. std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, h !" << std::endl; return "hello,fh !";});
    115. executor.shutdown();
    116. ff.get();
    117. std::cout << fg.get() << " " << fh.get() << std::endl;
    118. std::this_thread::sleep_for(std::chrono::seconds(5));
    119. executor.restart(); // 重新启动任务
    120. executor.commit(f).get(); //
    121. std::cout << "end..." << std::endl;
    122. return 0;
    123. }catch(std::exception& e){
    124. std::cout << "some unhappy happened... " << e.what() << std::endl;
    125. }

    为了避嫌,先进行一下版权说明:代码是 me “写”的,可是思路来自 Internet。 特别是这个线程池实现(窝的实现。基本 copy 了这个实现,好东西值得 copy !)。

    实现原理

    接着前面的废话说。“管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复。

    ” 这个思路有神马问题?线程池一般要复用线程。所以假设是取一个 task 分配给某一个 thread,运行完之后再又一次分配,在语言层面基本都是不支持的:一般语言的 thread 都是运行一个固定的 task 函数,运行完成线程也就结束了(至少 c++ 是这样)。so 要怎样实现 task 和 thread 的分配呢?

    让每个 thread 都去运行调度函数:循环获取一个 task,然后运行之。

    idea 是不是非常赞!

    保证了 thread 函数的唯一性,并且复用线程运行 task 。

    即使理解了 idea,me 想代码还是须要详解一下的。

    1. 一个线程 pool,一个任务队列 queue 。应该没有意见;
    2. 任务队列是典型的生产者-消费者模型,本模型至少须要两个工具:一个 mutex + 一个条件变量。或是一个 mutex + 一个信号量。mutex 实际上就是锁。保证任务的加入和移除(获取)的相互排斥性,一个条件变量是保证获取 task 的同步性:一个 empty 的队列。线程应该等待(堵塞);
    3. stop 控制任务提交,是受了 Java 的影响,还有实现类不叫 ThreadPool 而是叫 TaskExecutor;
    4. atomic<bool> 本身是原子类型,从名字上就懂:它们的操作 load()/store() 是原子操作,所以不须要再加 mutex。

    c++语言细节

    即使懂原理也不代表能写出程序。上面用了众多c++11的“奇技淫巧”,以下简单描写叙述之。

    1. using Task = function<void()> 是类型别名,简化了 typedef 的使用方法。function<void()> 能够觉得是一个函数类型,接受随意原型是 void() 的函数。或是函数对象,或是匿名函数。void() 意思是不带參数,没有返回值。

      最初的实现版本号 Task 类型不是单纯的函数类型,而是一个 class,包括一个 status 字段。表明 Task 的状态:未调度、运行中、运行结束。后来由于简化,故删掉了。

    2. pool.emplace_back(&TaskExecutor::schedual, this); 和 pool.push_back(thread{&TaskExecutor::schedual, this}) 功能一样,仅仅只是前者性能会更好;
    3. thread{&TaskExecutor::schedual, this} 是构造了一个线程对象。运行函数是成员函数 TaskExecutor::schedual 。
    4. 全部对象的初始化方式均採用了 {},而不再使用之前的 () 方式,由于风格不够一致且easy出错;
    5. 匿名函数: [](int a, int b)->int { return a+b; } 不多说。[] 是捕捉器,&r 是引用域外的变量 r, =r 是拷贝域外的 r 值;
    6. delctype(expr) 用来判断 expr 的类型。和 auto 是类似的。相当于类型占位符。占领一个类型的位置;auto f(A a, B b) -> decltype(a+b) 是一种使用方法,不能写作 decltype(a+b) f(A a, B b),为啥?!

      c++ 就是这么规定的!

    7. commit 方法是不是略奇葩!

      能够带随意多的參数,第一个參数是 f。后面依次是函数 f 的參数! 可变參数模板是 c++11 的一大亮点,够亮!至于为什么是 Arg... 和 arg... 。由于规定就是这么用的!

    8. make_shared 用来构造 shared_ptr 智能指针。

      使用方法大体是 shared_ptr<int> p = make_shared<int>(4) 然后 *p == 4 。智能指针的优点就是。 自己主动 delete !

    9. bind 函数。接受函数 f 和部分參数,返回currying后的匿名函数。譬如 bind(add, 4) 能够实现类似 add4 的函数!
    10. forward() 函数,类似于 move() 函数,后者是将參数右值化,前者是... 肿么说呢?大概意思就是:不改变最初传入的类型的引用类型(左值还是左值,右值还是右值);
    11. packaged_task 就是任务函数的封装类,通过 get_future 获取 future , 然后通过 future 能够获取函数的返回值(future.get());packaged_task 本身能够像函数一样调用 () ;
    12. queue 是队列类, front() 获取头部元素, pop() 移除头部元素;back() 获取尾部元素,push() 尾部加入元素。
    13. lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock(),是 c++ RAII 的 idea。
    14. condition_variable cv; 条件变量, 须要配合 unique_lock 使用。unique_lock 相比 lock_guard 的优点是:能够随时 unlock() 和 lock()。 cv.wait() 之前须要持有 mutex,wait 本身会 unlock() mutex,假设条件满足则会又一次持有 mutex。

    结束语

    是不是感觉有些反人类!

  • 相关阅读:
    HTTP断点续传 规格严格
    Java Shutdown 规格严格
    linux 命令源码 规格严格
    JTable调整列宽 规格严格
    linux 多CPU 规格严格
    Hello can not find git path 规格严格
    Kill 规格严格
    拜拜牛人 规格严格
    Swing 规格严格
    Debugging hangs in JVM (on AIX but methodology applicable to other platforms) 规格严格
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/6744562.html
Copyright © 2011-2022 走看看