zoukankan      html  css  js  c++  java
  • 使用c++11写个最简跨平台线程池(转载)

    为什么需要多线程?

    最简单的多线程长啥样?

    为什么需要线程池,有什么问题?

    实现的主要原理是什么?

    带着这几个问题,我们依次展开。

    1.为什么需要多线程?

        大部分程序毕竟都不是计算密集型的,简单的说,正常情况下,以单线程的模式来写对程序员而言是最舒心的。因为所有的代码都是顺序执行,非常容易理解!函数一级一级往下调用,代码一行一行执行。但是,代码的世界里,虽然cpu还好,但是却经常需要用到io资源,或者是其他服务器的网络资源,比如像数据库,如果这个时候因此把进程卡住,不管是客户端还是客户端都对用户体验相当糟糕。当然了,计算密集型的运算就更需要多线程,防止主线程被卡住。

    2.最简单的多线程长啥样?

        举个最简单的例子,服务器采用阻塞式socket,有一个网络线程负责收发包(IO),然后有一个逻辑主线程负责相应的业务操作,主线程和网络线程之间通过最简单的消息队列进行交换,而这个消息队例明显是两个线程都要访问(轮询消息队列是否为空)到的,所以,我们需要给这个消息队列上锁(std::mutex),即可以解决问题。由于比较简单我们就不需要看这个怎么码了。这种模式虽然简单,但是在合适的岗位上,也是极好的!

    3.那为什么需要线程池呢,有什么问题?

       还以刚才的服务器举例,如果业务线程逻辑比较复杂,又或者他需要访问数据库或者是其他服务器的资源,读取文件等等呢?当然他可以采用异步的数据库接口,但是采用异步意味着业务代码被碎片化。异步是典型的讨厌他,但是又干不掉他的样子。离题了。回归。这个时候我们需要多个业务线程处理了。多个线程就意味着多一份处理能力!回到上个问题,我们的多线程采用轮询消息队列的方式来交换信息,那么这么多个线程,不断的上锁解锁,光这个成本就够了。这个时候,条件变量就上线了(std::condition_variable)就登场了

    4.实现的主要原理是什么?

        业务线程不要轮询消息队列了,而所有的业务线程处于等待状态,当有消息再来的时候,再由产生消息的人,在我们示例场景就是网络线程了,随便唤醒一个工人线程即可。看看最关键的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
      //消费者
    void consumer()
    {
        //第一次上锁
        std::unique_lock < std::mutex > lck(mutex_);
        while (active_)
        {
            //如果是活动的,并且任务为空则一直等待
            while (active_ && task_.empty())
                cv_.wait(lck);
     
            //如果已经停止则退出
            if(!active_)
                break;
     
            T *quest = task_.front();
            task_.pop();
     
            //从任务队列取出后该解锁(任务队列锁)了
            lck.unlock();
     
            //执行任务后释放
            proc_(quest);
     
            //delete quest;   //在proc_已经释放该指针了
     
            //重新上锁
            lck.lock();
        }
    }

      

    算了,还是直接贴完整代码,看注释吧

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    #ifndef _WORKER_POOL_H_
    #define _WORKER_POOL_H_
     
    //file: worker_pool.h
     
    //#define  _CRT_SECURE_NO_WARNINGS
    // g++ -g -std=c++11 1.cc -D_GLIBCXX_USE_NANOSLEEP -lpthread */
     
    #include <vector>
    #include <queue>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    //#include <chrono>
     
    template<typename T>
    class WorkerPool
    {
    public:
        typedef WorkerPool<T> THIS_TYPE;
        typedef std::function<void(T*)> WorkerProc;
        typedef std::vector< std::thread* > ThreadVec;
     
        WorkerPool()
        {      
            active_ = false;
        }
        virtual ~WorkerPool()
        {
            for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it)
                delete *it;
            all_thread_.clear();
        }
        void Start(WorkerProc f,int worker_num=1)
        {
            active_ = true;    
            all_thread_.resize(worker_num);
            for (int i = 0; i < worker_num;i++ )
            {
                all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this));
            }
            proc_ = f;
        }
        //生产者
        void Push(T *t)
        {
            std::unique_lock < std::mutex > lck(mutex_);
            task_.push(t);
            cv_.notify_one();
        }
     
        void Stop()
        {
            //等待所有的任务执行完毕
            mutex_.lock();
            while (!task_.empty())
            {  
                mutex_.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                cv_.notify_one();
                mutex_.lock();
            }
            mutex_.unlock();
     
            //关闭连接后,等待线程自动退出
            active_ = false;
            cv_.notify_all();
            for(ThreadVec::iterator it = all_thread_.begin();
                it != all_thread_.end();++it)
                (*it)->join();
        }
    private:
        //消费者
        void consumer()
        {
            //第一次上锁
            std::unique_lock < std::mutex > lck(mutex_);
            while (active_)
            {
                //如果是活动的,并且任务为空则一直等待
                while (active_ && task_.empty())
                    cv_.wait(lck);
     
                //如果已经停止则退出
                if(!active_)
                    break;
     
                T *quest = task_.front();
                task_.pop();
     
                //从任务队列取出后该解锁(任务队列锁)了
                lck.unlock();
     
                //执行任务后释放
                proc_(quest);
     
                //delete quest;   //在proc_已经释放该指针了
     
                //重新上锁
                lck.lock();
            }
        }
     
        std::mutex mutex_;
        std::queue<T*> task_;
        std::condition_variable cv_;
        bool active_;
        std::vector< std::thread* > all_thread_;
        WorkerProc proc_;
    };
     
    #endif

      写一个类继承一下,并写一个工作函数和回调函数处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    #include "worker_pool.h"
    #include <iostream>
     
    //为了多耗点cpu,计算斐波那契数列吧
    static int fibonacci(int a)
    {
        //ASSERT(a > 0);
        if (a == 1 || a == 2)
            return 1;
        return fibonacci(a-1) + fibonacci(a-2);
    }
     
    //异步计算任务
    struct AsyncCalcQuest
    {
        AsyncCalcQuest():num(0),result(0)
        {}
        //计算需要用到的变量
        int num;
        int result;
    };
     
    //为了测试方便,引入全局变量用于标识线程池已将所有计算完成
    const int TOTAL_COUNT = 1000000;
    int now_count = 0;
     
    //继承一下线程池类,在子类处理计算完成的业务,在我们这里,只是打印一下计算结果
    class CalcWorkerPool:public WorkerPool<AsyncCalcQuest>
    {
    public:
        CalcWorkerPool(){}
     
        virtual ~CalcWorkerPool()
        {
        }
     
        //在工人线程中执行
        void DoWork(AsyncCalcQuest *quest)
        {
            //算了,不算这个了,根本算不出来
            quest->result = fibonacci(quest->num);       
            //quest->result = quest->num*0.618;
     
            //并将已完成任务返回到准备回调的列表
            std::unique_lock<std::mutex > lck(mutex_callbacks_);
            callbacks_.push_back(quest);
        }
     
        //在主线程执行
        void DoCallback()
        {
            //组回调任务上锁
            std::unique_lock<std::mutex > lck(mutex_callbacks_);
            while (!callbacks_.empty())
            {
                auto *quest = callbacks_.back();           
                {//此处为业务代码打印一下吧
                    std::cout << quest->num << " " << quest->result << std::endl;
                    now_count ++;
                }
                delete quest;       //TODO:这里如果采用内存池就更好了
                callbacks_.pop_back();
            }
        }
     
    private:
        //这里是准备给回调的任务列表
        std::vector<AsyncCalcQuest*> callbacks_;
        std::mutex mutex_callbacks_;
    };
     
    int main()
    {
        CalcWorkerPool workers;
     
        //工厂开工了 8个工人喔
        workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8);
         
        //开始产生任务了
        for (int i=0; i<TOTAL_COUNT; i++)
        {
            AsyncCalcQuest *quest = new AsyncCalcQuest;
            quest->num = i%40+1;
            workers.Push(quest);
        }
     
        while (now_count != TOTAL_COUNT)
        {
            workers.DoCallback();
        }
     
        workers.Stop();
     
        return 0;
    }
  • 相关阅读:
    nextSibling VS nextElementSibling
    线程实现连续启动停,并在某一时间段内运行
    线程:安全终止与重启
    监控知识体系
    后台服务变慢解决方案
    Java泛型类型擦除以及类型擦除带来的问题
    常见的 CSRF、XSS、sql注入、DDOS流量攻击
    Spring对象类型——单例和多例
    一次线上OOM过程的排查
    深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列
  • 原文地址:https://www.cnblogs.com/AquaGot/p/5560093.html
Copyright © 2011-2022 走看看