zoukankan      html  css  js  c++  java
  • C++11 半同步半异步线程池的实现

    #include <list>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <iostream>
    #include <functional>
    #include <memory>
    #include <atomic>
    using namespace std;
    
    namespace itstation 
    {
        template<typename T>
        class SynaQueue
        {
        public:
            SynaQueue(int maxSize)
                :m_maxSize(maxSize), m_needStop(false)
            {
            }
    
            void Put(const T& x)
            {
                Add(x);
            }
            void Put(T&& x)
            {
                Add(forward<T>(x)); //完美转发,不改变参数的类型
            }
            void Take(list<T>& list)
            {
                std::unique_lock<mutex> locker(m_mutex);
                // 判断式, 当都不满足条件时,条件变量会释放mutex, 并将线程置于waiting状态, 等待其他线程调用notify_one/all 将其唤醒。
                // 当满足其中一个条件时继续执行, 将队列中的任务取出,唤醒等待添加任务的线程
                // 当处于waiting状态的线程被唤醒时,先获取mutex,检查条件是否满足,满足-继续执行,否则释放mutex继续等待
                m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
                if (m_needStop)
                    return;
                list = move(m_queue);
                m_notFull.notify_one();
            }
            void Take(T& t)
            {
                unique_lock<mutex> locker(m_mutex); //
                m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
                if (m_needStop)
                    return;
                t = m_queue.front();
                m_queue.pop_front();
                m_notFull.notify_one();
            }
            void Stop()
            {
                {
                    lock_guard<mutex> locker(m_mutex);
                    m_needStop = true;
                }
                m_notFull.notify_all(); // 将所有等待的线程全部唤醒,被唤醒的进程检查m_needStop,为真,所有的线程退出执行
                m_notEmpty.notify_all();
            }
    
        private:
            bool NotFull() const
            {
                bool full = m_queue.size() >= m_maxSize;
                if (full)
                    cout << "缓冲区满了,需要等待。。。。" << endl;
                return !full;
            }
            bool NotEmpty() const
            {
                bool empty = m_queue.empty();
                if (empty)
                    cout << "缓冲区空了,需要等待,。。。异步层线程: " << this_thread::get_id() << endl;
                return !empty;
            }
    
    
            template<typename F>
            void Add(F&& x)
            {
                unique_lock<mutex> locker(m_mutex); // 通过m_mutex获得写锁
                m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); // 没有停止且满了,就释放m_mutex并waiting;有一个为真就继续执行
                if (m_needStop)
                    return;
                m_queue.push_back(forward<F>(x));
                m_notEmpty.notify_one();
            }
    
    
        private:
            list<T> m_queue; //缓冲区
            mutex m_mutex;  // 互斥量
            condition_variable m_notEmpty; // 条件变量
            condition_variable m_notFull;
            int m_maxSize; //同步队列最大的size
            bool m_needStop; // 停止标识
        };
    
        const int MaxTaskCount = 100;
        class ThreadPool
        {
        public:
            using Task = function < void() >;
            ThreadPool(int numThread = thread::hardware_concurrency())
                :m_queue(MaxTaskCount)
            {
                Start(numThread);
            }
    
            virtual ~ThreadPool()
            {
                Stop();
            }
    
            void Stop()
            {
                call_once(m_flag, [this]{StopThreadGroup(); });
            }
    
            void AddTask(Task&& task)
            {
                m_queue.Put(forward<Task>(task));
            }
    
            void AddTask(const Task& task)
            {
                m_queue.Put(task);
            }
    
        private:
            void Start(int numThreads)
            {
                m_running = true;
                //创建线程组
                for (int i = 0; i < numThreads; i++)
                {
                    m_threadgroup.emplace_back(make_shared<thread>(&ThreadPool::RunInThread, this));
                }
            }
    
            // 每个线程都执行这个函数
            void RunInThread()
            {
                while (m_running)
                {
                    //取任务分别执行
                    list<Task> list;
                    m_queue.Take(list);
                    for (auto& task : list)
                    {
                        if (!m_running)
                            return;
    
                        task();
                    }
                }
            }
            void StopThreadGroup()
            {
                m_queue.Stop(); // 同步队列中的线程停止
                m_running = false; // 让内部线程跳出循环并推出
                for (auto thread : m_threadgroup)
                {
                    if (thread)
                        thread->join();
                }
                m_threadgroup.clear();
            }
        private:
            list<shared_ptr<thread>> m_threadgroup; // 处理任务的线程组, 链表中存储着指向线程的共享指针
            SynaQueue<Task> m_queue; //同步队列
            atomic_bool m_running; // 是否停止的标识
            once_flag m_flag;
        };
    } // namespace itstation 
    
    #include <stdio.h>
    #include <iostream>
    #include "ObjectPool.h"
    #include <list>
    using namespace std;
    using namespace itstation;
    
    
    void TestThreadPool()
    {
        ThreadPool pool(2);
        thread thd1([&pool]{
            for (int i = 0; i < 10; i++)
            {
                auto thrID = this_thread::get_id();
                pool.AddTask([thrID, i]{cout << "同步层线程1的线程ID:" << thrID << "  这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
            }
        });
    
        thread thd2([&pool]{
            for (int i = 0; i < 10; i++)
            {
                auto thrID = this_thread::get_id();
                pool.AddTask([thrID, i]{cout << "同步层线程2的线程ID:" << thrID << "  这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
            }
        });
    
        this_thread::sleep_for(chrono::seconds(45));
        pool.Stop();
        thd1.join();
        thd2.join();
    }
    int main()
    {
        TestThreadPool();
    
    
    
        getchar();
        return 0;
    }
  • 相关阅读:
    git和github入门指南(3.1)
    git和github入门指南(2.2)
    git和github入门指南(2.1)
    git和github入门指南(1)
    webpack入门进阶(3)
    webpack入门进阶(2)
    webpack入门进阶(1)
    vue全家桶(4.3)
    vue全家桶(4.2)
    vue全家桶(4.1)
  • 原文地址:https://www.cnblogs.com/kaishan1990/p/5273237.html
Copyright © 2011-2022 走看看