zoukankan      html  css  js  c++  java
  • 半同步半异步线程池框架代码实现

    SyncTaskQueue.h
    #pragma once
    #include <list>
    #include <mutex>
    #include <condition_variable>
    #include <iostream>
    
    template <typename TASK>
    class SyncTaskQueue//队列内部实现加锁,保证操作同步
    { //这个队列是被线程池使用,因此具体实例在线程池中定义
    public:
        SyncTaskQueue(int max_size) :max_size_(max_size) {
    
    
        }
        ~SyncTaskQueue() {//何时析构   如果有个操作在一个线程中阻塞, 对象无法析构
            Stop();
            std::cout << "SyncTaskQueue destruction" << std::endl;
        }
        void  Stop() {//退出循环
            stop_ = true;
            not_empty_cond_.notify_one();
            not_full_cond_.notify_one();
            std::cout << "SyncTaskQueue Stop" << std::endl;
        }
    
        bool IsFull() {
            std::lock_guard<std::mutex>  locker;
            return list_task_.size() == max_size_;
    
        }
    
        bool IsEmpty() {
            std::lock_guard<std::mutex> locker;
            return list_task_.size() == 0;
        }
    
        void Push(TASK &&data) {
            std::unique_lock<std::mutex> locker(mutex_);
            while (Full() && !stop_) {//避免多次获取互斥锁
                std::cout << "task queue is full, wait" << std::endl;//阻塞
                //满的时候等待, 阻塞等待消费
                not_full_cond_.wait_for(locker, std::chrono::milliseconds(500));
            }
            if (stop_) {
                return;
            }
            if (!Full()) {
                list_task_.push_back(std::forward<TASK>(data));//为什么需要std::forward,保证右值,移动拷贝?
                not_empty_cond_.notify_one();//not empty cond signal   
            }
    
        }
    
        void Pop(TASK& data){//没有用返回值的形式
            std::unique_lock<std::mutex> locker(mutex_);
            while (Empty() && !stop_) {
                std::cout << "task queue is empty, wait" << std::endl;//阻塞
                not_empty_cond_.wait_for(locker, std::chrono::milliseconds(500));
            }
            if (!Empty()) {
                data = list_task_.front();
                list_task_.pop_front();//list  pop操作分为2步
                not_full_cond_.notify_one();//not full cond signal        
            }
        }
    
    private:
        bool Full() {
            return list_task_.size() == max_size_;
        }
        bool Empty() {
            return list_task_.size() == 0;
        }
        std::mutex   mutex_;
        int max_size_;
        std::atomic<bool>   stop_ = false;
        std::condition_variable   not_full_cond_;//没有满的时候激发
        std::condition_variable   not_empty_cond_;
        std::list<TASK>  list_task_;
    };
    
    ThreadPool.h
    #pragma once
    #include <list>
    #include <thread>
    #include <functional>
    #include <memory>
    #include <atomic>
    #include "SyncTaskQueue.h"
    #include <mutex>
    #include <condition_variable>
    #include <iostream>
    
    class ThreadPool {
    public:
        using Task = std::function<void()>;//使用别名
        ThreadPool();
        ~ThreadPool();
    
        void Stop();
        void AddTask(Task &&task);
    private:
        void Start(int num_thread);
        void RunThread();
        void StopThread();
    
    private:
        //多个线程对象容器,方便管理
        std::list<std::shared_ptr<std::thread>> thread_group_;
        int thread_num_;//线程数
        SyncTaskQueue<Task>  queue_;//任务队列
        std::atomic<bool> stop_ = false;//需要包含头文件atomic
        std::once_flag  flag_;
    };
    
    ThreadPool.cpp
    #include "ThreadPool.h"
    
    
    ThreadPool::ThreadPool():queue_(10)
    {//构造函数  Start私有化  保证也只能执行一次
        thread_num_ = std::thread::hardware_concurrency();
        Start(thread_num_);
    }
    ThreadPool::~ThreadPool() 
    {
       Stop();
    }
    
    void ThreadPool::Stop() {//保证stop 只有一次
        std::call_once(flag_, [this] {StopThread(); });
    }
    
    void ThreadPool::AddTask(Task &&task) {
        queue_.Push(std::forward<Task>(task));
    }
    
    void ThreadPool::Start(int num_thread) {
        thread_num_ = num_thread;
        std::cout << "thread pool start" << std::endl;
        for (int i = 0; i < thread_num_; i++) {
            thread_group_.push_back(std::make_shared<std::thread>(&ThreadPool::RunThread, this));//创建线程的过程中将线程函数传进去        
            std::cout << "thread " << thread_group_.back()->get_id() << " create " << std::endl;
        }
    #if 0
        for (auto thread : thread_group_) {
            thread->get_id();
        }
    #endif
        
    }
    
    //所有的子线程都会从任务队列里面去取任务执行
    void ThreadPool::RunThread() {//线程从任务队列中取任务 
        //多个线程里面只有队列任务共享的,stop_数据是共享的
        while (!stop_) {//如果没有停止,则一直在while循环
            Task  task_object;
            queue_.Pop(task_object);//如果没有数据,会自动阻塞
            if (stop_) {//如果时停止,则直接return
                return;
            }
            task_object();//取出任务执行,本线程不结束,继续从队列里面取任务执行
            std::cout << "thread id " << std::this_thread::get_id() << " exec one task" << std::endl;
    
        }
    }
    
    void ThreadPool::StopThread() {
        stop_ = true;
        queue_.Stop();//任务队列可能阻塞,需要先停止
        //等待线程池中的所有线程执行结束
        for (auto it = thread_group_.begin(); it != thread_group_.end(); it++) {
            (*it)->join();
        }
        thread_group_.clear();//线程对象列表清除
        std::cout << "thread pool stop" << std::endl;
    }
    
    void test_ThreadPool()
    {
    
        ThreadPool  thread_pool;
    
        std::thread  thread1([&thread_pool] {
            auto id = std::this_thread::get_id();
            std::cout << "thread id " << id << " add task " << std::endl;
            thread_pool.AddTask([id]() {        
                std::cout << "thread id " << id << " exec task " <<std::endl;
            });
        });
           
        std::thread  thread2([&thread_pool] {
            auto id = std::this_thread::get_id();
            std::cout << "thread id " << id << " add task " << std::endl;
            thread_pool.AddTask([id]() {
                std::cout << "thread id " << id << " exec task " << std::endl;
            });
        });
    
        std::thread  thread3([&thread_pool] {
            auto id = std::this_thread::get_id();
            std::cout << "thread id " << id << "add task" << std::endl;
            thread_pool.AddTask([id]() {
                std::cout << "thread id " << id << "exec task" << std::endl;
            });
        });
    
        //过完2s再结束
        std::this_thread::sleep_for(std::chrono::seconds(2));
        thread_pool.Stop();//线程池结束
        thread1.join();
        thread2.join();
        thread3.join();//同步线程函数执行完
    }
    

      

  • 相关阅读:
    linux运维、架构之路-K8s中部署Jenkins集群高可用
    linux运维、架构之路-K8s数据管理
    linux运维、架构之路-K8s通过Service访问Pod
    linux运维、架构之路-K8s应用
    linux运维、架构之路-K8s健康检查Health Check
    linux运维、架构之路-K8s滚动更新及回滚
    linux运维、架构之路-Kubernetes基础(一)
    Python前端HTML
    linux运维、架构之路-Kubernetes离线集群部署-无坑
    linux运维、架构之路-MongoDB单机部署
  • 原文地址:https://www.cnblogs.com/welen/p/14551276.html
Copyright © 2011-2022 走看看