zoukankan      html  css  js  c++  java
  • C++笔记--thread pool【转】

    版权声明:转载著名出处 https://blog.csdn.net/gcola007/article/details/78750220

    背景

    刚粗略看完一遍c++ primer第五版,一直在找一些c++小项目练手,实验楼里面有很多项目,但是会员太贵了,学生党就只能google+github自行搜索完成项目了。注:本文纯提供自己的理解,代码完全照抄,有想法的欢迎评论留言一起讨论。

    本文参考:

    涉及到的c++11的特性:

    • std::vector
    • std::thread
    • std::mutex
    • std::future
    • std::condition_variable

    线程池原理介绍

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

    线程池的组成部分:

    • 线程池管理器(ThreadPoolManager):用于创建并管理线程池
    • 工作线程(WorkThread): 线程池中线程
    • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
    • 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

    代码

    #ifndef ThreadPool_h
    #define ThreadPool_h
    
    #include <vector>
    #include <queue>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <future>
    #include <functional>
    
    class ThreadPool {
    public:
        ThreadPool(size_t);    //构造函数,size_t n 表示连接数
    
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args)   //任务管道函数
        -> std::future<typename std::result_of<F(Args...)>::type>;  //利用尾置限定符  std future用来获取异步任务的结果
    
        ~ThreadPool();
    private:
        // need to keep track of threads so we can join them
        std::vector< std::thread > workers;   //追踪线程
        // the task queue
        std::queue< std::function<void()> > tasks;    //任务队列,用于存放没有处理的任务。提供缓冲机制
    
        // synchronization  同步?
        std::mutex queue_mutex;   //互斥锁
        std::condition_variable condition;   //条件变量?
        bool stop;
    };
    
    // the constructor just launches some amount of workers
    inline ThreadPool::ThreadPool(size_t threads): stop(false)
    {
        for(size_t i = 0;i<threads;++i)
            workers.emplace_back(     //以下为构造一个任务,即构造一个线程
                [this]
                {
                    for(;;)
                    {
                    std::function<void()> task;   //线程中的函数对象
                        {//大括号作用:临时变量的生存期,即控制lock的时间
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); }); //当stop==false&&tasks.empty(),该线程被阻塞 !this->stop&&this->tasks.empty()
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
    
                        }
    
                    task(); //调用函数,运行函数
                    }
                }
             );
    }
    
    // add new work item to the pool
    template<class F, class... Args>
    auto ThreadPool::enqueue(F&& f, Args&&... args)  //&& 引用限定符,参数的右值引用,  此处表示参数传入一个函数
    -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;
         //packaged_task是对任务的一个抽象,我们可以给其传递一个函数来完成其构造。之后将任务投递给任何线程去完成,通过
    //packaged_task.get_future()方法获取的future来获取任务完成后的产出值
        auto task = std::make_shared<std::packaged_task<return_type()> >(  //指向F函数的智能指针
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)  //传递函数进行构造
            );
        //future为期望,get_future获取任务完成后的产出值
        std::future<return_type> res = task->get_future();   //获取future对象,如果task的状态不为ready,会阻塞当前调用者
        {
            std::unique_lock<std::mutex> lock(queue_mutex);  //保持互斥性,避免多个线程同时运行一个任务
    
            // don't allow enqueueing after stopping the pool
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
    
            tasks.emplace([task](){ (*task)(); });  
    //将task投递给线程去完成,vector尾部压入,std::packaged_task 重载了 operator(),重载后的operator()执行function。因此可以(*task)()可以压入vector<function<void()>> } condition.notify_one(); //选择一个wait状态的线程进行唤醒,并使他获得对象上的锁来完成任务(即其他线程无法访问对象) return res; }//notify_one不能保证获得锁的线程真正需要锁,并且因此可能产生死锁 // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); //通知所有wait状态的线程竞争对象的控制权,唤醒所有线程执行 for(std::thread &worker: workers) worker.join(); //因为线程都开始竞争了,所以一定会执行完,join可等待线程执行完 } #endif /* ThreadPool_h */

    线程池大约100行,下面是运行代码

    #include <iostream>
    #include <vector>
    #include <chrono>
    
    #include "ThreadPool.h"
    
    int main()
    {
    
        ThreadPool pool(4);
        std::vector< std::future<int> > results;
    
        for(int i = 0; i < 8; ++i) {
            results.emplace_back(
              pool.enqueue([i] {
                std::cout << "hello " << i << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::cout << "world " << i << std::endl;
                return i*i;
            })
          );
        }
    
        for(auto && result: results)    //通过future.get()获取返回值
            std::cout << result.get() << ' ';
        std::cout << std::endl;
    
        return 0;
    }


    代码剖析

    通过新建一个线程池类,以类来管理资源(《c++ effective》资源管理一章有提到)。该类包含3个公有成员函数与5个私有成员:构造函数与析构函数即满足(RAII:Resource Acquisition Is Initialization)。

    • 构造函数接受一个size_t类型的数,表示连接数
    • enqueue表示线程池部分中的任务管道,是一个模板函数
    • workers是一个成员为thread的vector,用来监视线程状态
    • tasks表示线程池部分中的任务队列,提供缓冲机制
    • queue_mutex表示互斥锁
    • condition表示条件变量(互斥锁,条件变量以及stop将在后面通过例子说明)

    queue_mutex、condition与stop这三个成员让初次接触多线程的我非常的迷惑,互斥到底是什么意思?为什么需要一个bool量来控制?条件变量condition又是什么?
    不懂的可以搜索:多线程的生产者与消费者模型
    同时附上condition_variable详解

    构造函数ThreadPOOL(size_t):

    • 省略了参数
    • emplace_back相当于push_back但比push_back更为高效
    • wokers压入了一个lambda表达式(即一个匿名函数),表示一个任务(线程),使用for的无限循环,task表示函数对象,线程池中的函数接口在enqueue传入的参数之中,condition.wait(lock,bool),当bool为false的时候,线程将会被堵塞挂起,被堵塞时需要notify_one来唤醒线程才能继续执行

    任务队列函数enqueue(F&& f, Args&&… args)

    • 这类多参数模板的格式就是如此
    • -> 尾置限定符,语法就是如此,用来推断auto类型
    • typename与class的区别
    • result_of用来得到返回类型的对象,它有一个成员::type

    析构函数~ThreadPool()

    • 通过notify_all可以唤醒线程竞争任务的执行,从而使所有任务不被遗漏
  • 相关阅读:
    Single Number II
    Pascal's Triangle
    Remove Duplicates from Sorted Array
    Populating Next Right Pointers in Each Node
    Minimum Depth of Binary Tree
    Unique Paths
    Sort Colors
    Swap Nodes in Pairs
    Merge Two Sorted Lists
    Climbing Stairs
  • 原文地址:https://www.cnblogs.com/gnivor/p/10416922.html
Copyright © 2011-2022 走看看