zoukankan      html  css  js  c++  java
  • 简单线程池原理和代码

    线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务

    本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理

    代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了。

    线程池代码见Github【点击】

    由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明)

    • std::thread
    • std::mutex
    • std::condition_variable
    • std::move
    • std::lock_guard
    • std::unique_lock
    • lambda表达式

    下面开始代码讲解:

    先从入口说起:构造函数

    template <unsigned _TCount>
    FixedThreadPool<_TCount>::FixedThreadPool()
    : m_jobsleft(0), m_isDone(false), m_isFinished(false) {
        for (int i = 0; i < _TCount; ++i) {
            m_threads[i] = std::move(std::thread([this, i]() {
                this->DoTask();
            }));
        }
    }

    在构造函数中,根据模板参数_TCount创建一定数量的线程,将所有线程存在了数组(std::array)中。

    然后你会注意到,每个线程都会运行DoTask方法,注意:DoTask是运行于子线程中的

    template <unsigned _TCount>
    void FixedThreadPool<_TCount>::DoTask() {
        // Run in subthreads.
        // Take the next job in the queue and run it. Notify the main thread that a job has completed.
        while (!m_isDone) {
            this->NextJob()();
            -- m_jobsleft;
            // Notify the main thread that a job has completed.
            m_conditionWait.notify_one();
        }
    }

    不去看那些烦人的标记变量,先从大的方面理解其原理:

    在循环中每次去一个任务(我猜是在队列里取,若队列为空则会block),取到任务后执行任务(即执行lambda表达式),jobsleft减少,然后通知给主线程“我又执行完一个任务”

    这里有两个关注点:NextJob如何取任务?m_conditionWait都有谁在阻塞?
    先看NextJob如何取任务?

    template <unsigned _TCount>
    typename FixedThreadPool<_TCount>::JobHandler FixedThreadPool<_TCount>::NextJob() {
        // Run in subthreads.
        // Get the next job; pop the first item in the queue, otherwise wait for a signal from the main thread.
        JobHandler handler;
        std::unique_lock<std::mutex> qlock(m_mutexQueue);
        
        // Wait for a job if we don't have any.
        m_conditionJob.wait(qlock, [this]()->bool {
            return m_queue.size() || m_isDone;
        });
        
        // Get job from the queue
        if (!m_isDone) {
            handler = m_queue.front();
            m_queue.pop_front();
        }
        else { // If we're bailing out, 'inject' a job into the queue to keep jobsleft accurate.
            handler = []{};
            ++m_jobsleft;
        }
        return handler;
    }

    注意:这个函数也是运行在子线程中

    希望你已经学会使用std::condition_variable了,简单来说m_conditionJob.wait就是在判断是否队列为空(先不要关心烦人的m_isDone)。

    如果队列为空则会阻塞,然后就会一直等待,等待到啥时候呢?(我猜测当有新任务时一定会有通知notify_one()),通知来了检测满足条件就继续向下执行。

    会看到从队列中取出一个任务,然后返回。

    这里有个关注点:啥时候会有m_conditionJob的notify_xxx()?

    在这里:

    template <unsigned _TCount>
    void FixedThreadPool<_TCount>::AddJob(JobHandler job) {
        // Add a new job to the pool. If there are no jobs in the queue, a thread is woken up to take the job. If all threads are busy, the job is added to the end of the queue.
        std::lock_guard<std::mutex> guard(m_mutexQueue);
        m_queue.emplace_back(job);
        ++ m_jobsleft;
        m_conditionJob.notify_one();
    }

    注意:这是主线程中由用户调用的方法

    当然还有一处在JoinAll中,不过这对理解线程池运行流程关系不大。下面讨论另一个问题时在看。

    现在你脑子中是否有线程池的运行流程了。

    主线程:【创建子线程】->【AddJob】

    子线程:【DoTask】->【NextJob】->【NextJob】...->【NextJob】

    描述:子线程在DoTask中循环通过【NextJob】取任务,当没有任务时,会block在NextJob中,一直等待到主线程的【AddJob】调用后,会wakeup一个(只会唤醒一个线程)已经阻塞的NextJob,然后NextJob返回队列中的一个任务,交给DoTask执行,DoTask执行完成后通知又执行完一个任务(可用于判断所有任务是否都执行完成)。

    到这里还比较简单一些,下面考虑退出的问题:

    退出的问题在于让所有可能被阻塞住的子线程全部唤醒,然后顺利的走向销毁。

    先看析构函数:

    template <unsigned _TCount>
    FixedThreadPool<_TCount>::~FixedThreadPool() {
        this->JoinAll();
    }

    JoinAll,听着就像thread的join嘛,看看:

    template <unsigned _TCount>
    void FixedThreadPool<_TCount>::JoinAll(bool wait) {
        if (m_isFinished) {
            return;
        }
        
        if (wait) {
            this->WaitAll();
        }
        
        // note that we're done, and wake up any thread that's
        // waiting for a new job
        m_isDone = true;
        m_conditionJob.notify_all();
        
        for(auto &x : m_threads) {
            if(x.joinable()) {
                x.join();
            }
        }
        m_isFinished = true;
    }

    注意:JoinAll会在主线程执行

    奥,m_isFinished用来保证JoinAll只执行一次的。

    wait嘛,WaitAll看名字就像等待所有任务执行完毕嘛,而且必须要阻塞住调用WaitAll的线程,否则怎么能叫Wait呢!

    下面看看m_isDone=true,然后通知所有(notify_all())的m_conditionJob.wait,那就是通知所有线程中的m_conditionJob.wait呀,先不管继续往下看。

    下面就是遍历所有的子线程,然后全部join掉,这可是会阻塞主线程的!主线程会等待所有join的子线程执行完才能回到主线程,不过若所有任务执行完了,join之后子线程不就over了嘛

        // Wait for a job if we don't have any.
        m_conditionJob.wait(qlock, [this]()->bool {
            return m_queue.size() || m_isDone;
        });

    还记得这里吧,NextJob方法,运行于子线程中。

    当JoinAll中notify_all时,这里就会被唤醒,由于m_isDone为true,不管你队列是否为空都会继续执行下去。子线程要退出,那么就不能被阻塞住,所以这里就是用来唤醒子线程,让子线程顺利退出的。

        // Get job from the queue
        if (!m_isDone) {
            handler = m_queue.front();
            m_queue.pop_front();
        }
        else { // If we're bailing out, 'inject' a job into the queue to keep jobsleft accurate.
            handler = []{};
            ++m_jobsleft;
        }

    所以就到了下面这个语句块,返回一个空的handler。 都要退出了,为了处理一致,返回空的也无可厚非。

    下面再看看WaitAll是什么鬼:

    template <unsigned _TCount>
    void FixedThreadPool<_TCount>::WaitAll() {
        // Waits until all jobs have finshed executing.
        if (m_jobsleft > 0) {
            std::unique_lock<std::mutex> lock(m_mutexWait);
            m_conditionWait.wait(lock, [this]()->bool {
                return this->m_jobsleft == 0;
            });
            lock.unlock();
        }
    }

    奥,原来如此,WaitAll果然就是阻塞住你,然后等待剩余的任务数为0时,才会被唤醒(结合DoTask中的notify_one)。

    这么看来,在JoinAll中:

    如果wait=true,那么就会等待所有任务自然的执行完成后join所有线程退出。

    如果wait=false,那么就会让所有阻塞在等待任务上的线程直接执行一个空任务,然后退出。或者让正在执行任务的线程执行完任务后退出。

     

  • 相关阅读:
    Vue-基础(四)
    Vue-基础(三)
    Vue-基础(一)
    Vue-基础(二)
    CSS-初始化模板2(common.css)
    CSS-初始化模板1(normalize.css)
    CSS预处理器-Less
    MySQL视窗函数row_number(), rank(), denser_rank()
    LeetCode第4题:寻找两个有序数组的中位数
    无重复字符的最长子串
  • 原文地址:https://www.cnblogs.com/luweimy/p/5172946.html
Copyright © 2011-2022 走看看