zoukankan      html  css  js  c++  java
  • (十五)线程池的实现

    1. 轮询方式

    #include <iostream>
    #include <string>
    
    #include <memory>
    #include <vector>
    #include <mutex>
    #include <queue>
    #include <thread>
    #include <atomic>
    
    // interface
    class Runnable{
    public:
        virtual void Run() = 0;
    };
    
    // thread pool
    class FixedThreadPool{
    public:
        FixedThreadPool(int size) :threadPoolSize(size){}
    
        ~FixedThreadPool(){
            if (running){
                Stop();
            }
        }
    
        void Start(){
            running = true;
            for (int i = 0; i < threadPoolSize; ++i){
                std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
                std::lock_guard<std::mutex> lk(txThread);
                threads.push_back(t);
            }
        }
    
        void Stop(){
            bool flag = true;
            while (flag){
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::lock_guard<std::mutex> lk(txTasks);
                flag = !tasks.empty();
            }
    
            running = false;
            for (auto thread : threads){
                thread->join();
            }
        }
    
        void Submit(std::shared_ptr<Runnable> r){
            std::lock_guard<std::mutex> lk(txTasks);
            tasks.push(r);
        }
    
    private:
        void Run(){
            while (running){
                std::shared_ptr<Runnable> task;
                {
                    std::lock_guard<std::mutex> lk(txTasks);
                    if (tasks.empty()){
                        continue;
                    }
                    task = tasks.front();
                    tasks.pop();
                }
                task->Run();
            }
        }
    
    private:
        int                                        threadPoolSize;
        std::vector<std::shared_ptr<std::thread>>  threads;
        std::mutex                                 txThread;
        std::queue<std::shared_ptr<Runnable>>      tasks;
        std::mutex                                 txTasks;
        std::atomic_bool                           running;
    };
    
    
    // concrete runnable
    class Analyzer:public Runnable{
    public:
        Analyzer() = default;
        void Run(){
            std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
        }
    private:
        static std::atomic_int cnt;
    };
    std::atomic_int Analyzer::cnt;
    
    // thread pool test
    void ThreadPoolTest(){
        std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(5);
        pool->Start();
    
        for (int i = 0; i < 100; ++i){
            std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
            pool->Submit(r);
        }
    
        // pool->Stop();
    }
    
    // main
    int main(){
        ThreadPoolTest();
        return 0;
    }
    

    2. 条件变量方式

    #include <iostream>
    #include <string>
    
    #include <memory>
    #include <vector>
    #include <mutex>
    #include <condition_variable>
    #include <queue>
    #include <thread>
    #include <atomic>
    
    // interface
    class Runnable{
    public:
        virtual void Run() = 0;
    };
    
    // thread pool
    class FixedThreadPool{
    public:
        FixedThreadPool(int size) :threadPoolSize(size){}
    
        ~FixedThreadPool(){
            if (running){
                Stop();
            }
        }
    
        void Start(){
            running = true;
            for (int i = 0; i < threadPoolSize; ++i){
                std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
                std::lock_guard<std::mutex> lk(txThread);
                threads.push_back(t);
            }
        }
    
        void Stop(){
            bool flag = true;
            while (flag){
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::lock_guard<std::mutex> lk(txTasks);
                flag = !tasks.empty();
            }
    
            running = false;
            cond.notify_all();
    
            for (auto thread : threads){
                thread->join();
            }
        }
    
        void Submit(std::shared_ptr<Runnable> r){
            std::lock_guard<std::mutex> lk(txTasks);
            tasks.push(r);
            cond.notify_one();
        }
    
    private:
        void Run(){
            while (running){
                std::shared_ptr<Runnable> task;
                /*{
                    std::lock_guard<std::mutex> lk(txTasks);
                    if (tasks.empty()){
                    continue;
                    }
                    task = tasks.front();
                    tasks.pop();
                    }*/
                {
                    std::unique_lock<std::mutex>  lk(txTasks);
                    while (tasks.empty() && running){
                        cond.wait(lk);
                    }
                    if (!running){
                        break;
                    }
                    task = tasks.front();
                    tasks.pop();
                }
    
                task->Run();
            }
        }
    
    private:
        int                                        threadPoolSize;
        std::vector<std::shared_ptr<std::thread>>  threads;
        std::mutex                                 txThread;
        std::queue<std::shared_ptr<Runnable>>      tasks;
        std::mutex                                 txTasks;
        std::condition_variable                    cond;
        std::atomic_bool                           running;
    };
    
    
    // concrete runnable
    class Analyzer:public Runnable{
    public:
        Analyzer() = default;
        void Run(){
            std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
        }
    private:
        static std::atomic_int cnt;
    };
    std::atomic_int Analyzer::cnt;
    
    // thread pool test
    void ThreadPoolTest(){
        std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(10);
        pool->Start();
    
        for (int i = 0; i < 100; ++i){
            std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
            pool->Submit(r);
        }
    
        // pool->Stop();
    }
    
    // main
    int main(){
        ThreadPoolTest();
        return 0;
    }
    
  • 相关阅读:
    go反射实现实体映射
    golang的time包:秒、毫秒、纳秒时间戳输出
    在 Gin 框架中使用 JWT 认证
    docker安装redis
    docker安装mysql5.7
    Python Web实战:Python+Django+MySQL实现基于Web版的增删改查
    apache2.4配置weblogic12c集群(linux环境)
    小BUG大原理:重写WebMvcConfigurationSupport后SpringBoot自动配置失效
    Spring源码解析02:Spring IOC容器之XmlBeanFactory启动流程分析和源码解析
    Git进阶:常用命令和问题案例整理
  • 原文地址:https://www.cnblogs.com/walkinginthesun/p/10081802.html
Copyright © 2011-2022 走看看