zoukankan      html  css  js  c++  java
  • 基于C++11的100行实现简单线程池

    基于C++11的100行实现简单线程池

     

    1 线程池原理

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

    线程池组成部分:

    线程池管理器:用于创建并管理线程池

    工作线程: 线程池中实际执行的线程

    任务接口: 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。

    任务队列:线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。

    2 并发下线程池的最佳数量

    如果是CPU密集型应用,则线程池大小设置为N+1;(对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的效率。(即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费。摘自《Java Concurrency In Practise》)。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。I
    如果是IO密集型应用,则线程池大小设置为2N+1;IO密集型任务可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。

    2.1 线程池是否一定比单线程高效?

    答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:多线程带来线程上下文切换开销,单线程就没有这种开销 锁

    当然“Redis很快”更本质的原因在于: 
    Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。

    3 线程池设计的C++11的特性

    • std::vector
    • std::thread
    • std::mutex
    • std::future
    • std::condition_variable

    C++代码:

    threadpool.h

    #pragma once
    #include <iostream>
    #include <vector>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <stdexcept>
    
    const int MAX_THREADS = 100; // 线程池阈值
    
    template <typename T>
    class threadpool
    {
    public:
        threadpool(int number = 1); // 线程池构造函数,传入线程数,默认为1
        ~threadpool();                // 线程池析构函数
        bool append(T* request);    // 添加任务到任务队列
    
    private:
        static void* worker(void* arg); // 工作线程数组
        void run();                        // 工作线程的执行函数
    
    private:
        std::vector<std::thread> work_threads;  // 工作线程
        std::queue<T*> task_queues;                // 任务队列
        std::mutex queue_mutex;                    // 队列互斥锁
        std::condition_variable condition;        // 条件变量
        bool stop;                                // 停止标志
    };
    
    template <typename T>
    threadpool<T>::threadpool(int number) :stop(false)
    {
        if (number <= 0 || number > MAX_THREADS) // 如果number小于等于0或者大于线程池阈值,抛出异常
            throw std::exception();
    
        for (int i = 0; i < number; i++)
        {
            std::cout << "创建第" << i << "个线程" << std::endl;
            // 添加worker函数到线程中
            work_threads.emplace_back(worker, this); // Q:this是什么作用,为什么没了this会出错?
        }
    }
    
    template <typename T>
    inline threadpool<T>::~threadpool()
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex); // 拿锁
            stop = true;                                    // 停止标志置位true
        }
        condition.notify_all();                                // 通知所有工作线程,唤醒后因为stop为true了,所有都会结束
        for (auto& ww : work_threads)
            ww.join();
    }
    
    template <typename T>
    bool threadpool<T>::append(T* request) // 添加一个新的工作任务到任务队列
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            task_queues.emplace(request);  // 将任务添加到任务队列
        }
        condition.notify_one();               // 发送通知,唤醒某一个工作线程执行任务
        return true;
    }
    
    template <typename T>
    void* threadpool<T>::worker(void* arg)
    {
        threadpool* pool = (threadpool*)arg;
        pool->run();
        return pool;
    }
    
    template <typename T>
    void threadpool<T>::run()
    {
        while (!stop)
        {
            std::unique_lock<std::mutex> lk(queue_mutex);                            // 拿锁(独占所有权式)
            this->condition.wait(lk, [this] {return !this->task_queues.empty(); }); // 等待条件成立
            /*
                执行条件变量等待的时候,已经拿到了锁(即lock已经拿到锁,没有阻塞)
                这里将会unlock释放锁,其他线程可以继续拿锁,但此处仍然阻塞,等待条件成立
                一旦收到其他线程notify_*唤醒,则再次lock,然后进行条件判断
                当{return !this->task_queues.empty(); }的结果为false将阻塞
                条件为true时候解除阻塞。此时lock依然为锁住状态
            */
            if (this->task_queues.empty())            // 如果任务队列为空,继续
                continue;
            else 
            {
                T* request = task_queues.front();   // 取得任务队列首任务
                task_queues.pop();                    // 从队列中移除
                if (request)
                    request->process();                // 执行任务
            }
        }
    }

     

    main.cpp

    #include <iostream>
    #include "threadpool.h"
    
    class Task
    {
    public:
        void process()
        {
            static int a = 0;
            ++a;
            std::cout << "run......" << a << std::endl;
        }
    };
    
    int main()
    {
        threadpool<Task> tp(2);
    
        while (1)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
    
            Task* t = new Task();
            tp.append(t);
            delete t;
        }
    }

     

    参考:

    https://blog.csdn.net/gcola007/article/details/78750220

    https://blog.csdn.net/qq_34417408/article/details/78895573

     

  • 相关阅读:
    迅为-IMX6Q开发板Android应用测试-AndroidStudio-录音机测试
    迅为3399开发板Linux固件编译
    迅为-i.MX6ULL开发板-Busybox移植DHCP(三)
    迅为-i.MX6ULL开发板-Busybox移植DHCP(二)
    itop4412开发板Qt串口编程-实现串口功能
    迅为iMX6D/Q/PLUS设备树 Android7.1.2 系统编译
    迅为3399开发板使用ADB命令传递文件到android设备
    Jlink如何校验Hex
    编码器芯片MLX90363的使用
    编程小技巧
  • 原文地址:https://www.cnblogs.com/zkfopen/p/11236537.html
Copyright © 2011-2022 走看看