zoukankan      html  css  js  c++  java
  • (十五)线程池的实现

    1. 轮询方式

    #include <iostream>
    #include <string>
    
    #include <memory>
    #include <vector>
    #include <mutex>
    #include <queue>
    #include <thread>
    #include <atomic>
    
    // interface
    class Runnable{
    public:
        virtual void Run() = 0;
    };
    
    // thread pool
    class FixedThreadPool{
    public:
        FixedThreadPool(int size) :threadPoolSize(size){}
    
        ~FixedThreadPool(){
            if (running){
                Stop();
            }
        }
    
        void Start(){
            running = true;
            for (int i = 0; i < threadPoolSize; ++i){
                std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
                std::lock_guard<std::mutex> lk(txThread);
                threads.push_back(t);
            }
        }
    
        void Stop(){
            bool flag = true;
            while (flag){
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::lock_guard<std::mutex> lk(txTasks);
                flag = !tasks.empty();
            }
    
            running = false;
            for (auto thread : threads){
                thread->join();
            }
        }
    
        void Submit(std::shared_ptr<Runnable> r){
            std::lock_guard<std::mutex> lk(txTasks);
            tasks.push(r);
        }
    
    private:
        void Run(){
            while (running){
                std::shared_ptr<Runnable> task;
                {
                    std::lock_guard<std::mutex> lk(txTasks);
                    if (tasks.empty()){
                        continue;
                    }
                    task = tasks.front();
                    tasks.pop();
                }
                task->Run();
            }
        }
    
    private:
        int                                        threadPoolSize;
        std::vector<std::shared_ptr<std::thread>>  threads;
        std::mutex                                 txThread;
        std::queue<std::shared_ptr<Runnable>>      tasks;
        std::mutex                                 txTasks;
        std::atomic_bool                           running;
    };
    
    
    // concrete runnable
    class Analyzer:public Runnable{
    public:
        Analyzer() = default;
        void Run(){
            std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
        }
    private:
        static std::atomic_int cnt;
    };
    std::atomic_int Analyzer::cnt;
    
    // thread pool test
    void ThreadPoolTest(){
        std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(5);
        pool->Start();
    
        for (int i = 0; i < 100; ++i){
            std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
            pool->Submit(r);
        }
    
        // pool->Stop();
    }
    
    // main
    int main(){
        ThreadPoolTest();
        return 0;
    }
    

    2. 条件变量方式

    #include <iostream>
    #include <string>
    
    #include <memory>
    #include <vector>
    #include <mutex>
    #include <condition_variable>
    #include <queue>
    #include <thread>
    #include <atomic>
    
    // interface
    class Runnable{
    public:
        virtual void Run() = 0;
    };
    
    // thread pool
    class FixedThreadPool{
    public:
        FixedThreadPool(int size) :threadPoolSize(size){}
    
        ~FixedThreadPool(){
            if (running){
                Stop();
            }
        }
    
        void Start(){
            running = true;
            for (int i = 0; i < threadPoolSize; ++i){
                std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
                std::lock_guard<std::mutex> lk(txThread);
                threads.push_back(t);
            }
        }
    
        void Stop(){
            bool flag = true;
            while (flag){
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::lock_guard<std::mutex> lk(txTasks);
                flag = !tasks.empty();
            }
    
            running = false;
            cond.notify_all();
    
            for (auto thread : threads){
                thread->join();
            }
        }
    
        void Submit(std::shared_ptr<Runnable> r){
            std::lock_guard<std::mutex> lk(txTasks);
            tasks.push(r);
            cond.notify_one();
        }
    
    private:
        void Run(){
            while (running){
                std::shared_ptr<Runnable> task;
                /*{
                    std::lock_guard<std::mutex> lk(txTasks);
                    if (tasks.empty()){
                    continue;
                    }
                    task = tasks.front();
                    tasks.pop();
                    }*/
                {
                    std::unique_lock<std::mutex>  lk(txTasks);
                    while (tasks.empty() && running){
                        cond.wait(lk);
                    }
                    if (!running){
                        break;
                    }
                    task = tasks.front();
                    tasks.pop();
                }
    
                task->Run();
            }
        }
    
    private:
        int                                        threadPoolSize;
        std::vector<std::shared_ptr<std::thread>>  threads;
        std::mutex                                 txThread;
        std::queue<std::shared_ptr<Runnable>>      tasks;
        std::mutex                                 txTasks;
        std::condition_variable                    cond;
        std::atomic_bool                           running;
    };
    
    
    // concrete runnable
    class Analyzer:public Runnable{
    public:
        Analyzer() = default;
        void Run(){
            std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
        }
    private:
        static std::atomic_int cnt;
    };
    std::atomic_int Analyzer::cnt;
    
    // thread pool test
    void ThreadPoolTest(){
        std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(10);
        pool->Start();
    
        for (int i = 0; i < 100; ++i){
            std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
            pool->Submit(r);
        }
    
        // pool->Stop();
    }
    
    // main
    int main(){
        ThreadPoolTest();
        return 0;
    }
    
  • 相关阅读:
    C#将List<T>转化为DataTable
    SqlServer常用内置函数
    C#索引器
    验证Textbox的字符长度
    WM消息对应的Message消息中的Lparam和WParam
    对窗体操作的WM消息
    DllImport使用
    C#获取当前路径的七种方法
    注册ActiveX控件
    [转]VS2010中水晶报表安装应用及实例
  • 原文地址:https://www.cnblogs.com/walkinginthesun/p/10081802.html
Copyright © 2011-2022 走看看