zoukankan      html  css  js  c++  java
  • Object Pool 对象池的C++11使用(转)

    很多系统对资源的访问快捷性及可预测性有严格要求,列入包括网络连接、对象实例、线程和内存。而且还要求解决方案可扩展,能应付存在大量资源的情形。

    object pool针对特定类型的对象循环利用,这些对象要么创建开销巨大,要么可创建的数量有限。而且在pool中的对象需要做到无状态。

    然后转了这位博主的代码,还在研究中

    const int MaxObjectNum = 10;
    template <typename T>
    class ObjectPool
    {
        template <typename... Args>
        using Constructor = std::function<std::shared_ptr<T>(Args...)>;
    
    public:
        ObjectPool(void)
            : m_bNeedClear(false)
        {
        }
    
        virtual ~ObjectPool(void)
        {
            m_bNeedClear = true;
        }
    
        template <typename... Args>
        void Init(size_t num, Args &&... args)
        {
            if (num <= 0 || num > MaxObjectNum)
            {
                throw std::logic_error("object num out of range.");
            }
    
            auto constructName = typeid(Constructor<Args...>).name();
    
            for (size_t i = 0; i < num; i++)
            {
                m_object_map.emplace(constructName,
                                     std::shared_ptr<T>(new T(std::forward<Args>(args)...), [constructName, this](T *t) {
                                         if (m_bNeedClear)
                                         {
                                             delete t;
                                         }
                                         else
                                         {
                                             m_object_map.emplace(constructName, std::shared_ptr<T>(t));
                                         }
                                     }));
            }
        }
    
        template <typename... Args>
        std::shared_ptr<T> Get()
        {
            string constructName = typeid(Constructor<Args...>).name();
    
            auto range = m_object_map.equal_range(constructName);
    
            for (auto it = range.first; it != range.second; ++it)
            {
                auto ptr = it->second;
                m_object_map.erase(it);
                return ptr;
            }
    
            return nullptr;
        }
    
    private:
        std::multimap<std::string, std::shared_ptr<T>> m_object_map;
        bool m_bNeedClear;
    };
    ObjectPool.cpp
    class BigObject
    {
    public:
        BigObject() {}
    
        BigObject(int a) {}
    
        BigObject(const int &a, const int &b)
        {
        }
    
        void Print(const string &str)
        {
            cout << str << endl;
        }
    };
    
    void Print(shared_ptr<BigObject> p, const string &str)
    {
        if (p != nullptr)
        {
            p->Print(str);
        }
    }
    
    int main()
    {
        ObjectPool<BigObject> pool;
        pool.Init(2);
        {
            auto p = pool.Get();
            Print(p, "p");
    
            auto p2 = pool.Get();
            Print(p2, "p2");
        }
    
        auto p = pool.Get();
        Print(p, "p");
    
        auto p2 = pool.Get();
        Print(p2, "p2");
    
        auto p3 = pool.Get();
        Print(p3, "p3");
    
        pool.Init(2, 1);
    
        auto p4 = pool.Get<int>();
    
        Print(p4, "p4");
        getchar();
        return 0;
    }
    test.cpp

     还有个半同步半异步的线程池,这个我看懂的多一点,就是用队列和信号量去实现多线程并发

    #include <list>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <iostream>
    #include <functional>
    #include <memory>
    #include <atomic>
    using namespace std;
    
    namespace itstation
    {
    template <typename T>
    class SynaQueue
    {
    public:
        SynaQueue(int maxSize)
            : m_maxSize(maxSize), m_needStop(false)
        {
        }
    
        void Put(const T &x)
        {
            Add(x);
        }
        void Put(T &&x)
        {
            Add(forward<T>(x)); //完美转发,不改变参数的类型
        }
        void Take(list<T> &list)
        {
            std::unique_lock<mutex> locker(m_mutex);
            // 判断式, 当都不满足条件时,条件变量会释放mutex, 并将线程置于waiting状态, 等待其他线程调用notify_one/all 将其唤醒。
            // 当满足其中一个条件时继续执行, 将队列中的任务取出,唤醒等待添加任务的线程
            // 当处于waiting状态的线程被唤醒时,先获取mutex,检查条件是否满足,满足-继续执行,否则释放mutex继续等待
            m_notEmpty.wait(locker, [this] { return m_needStop || NotEmpty(); });
            if (m_needStop)
                return;
            list = move(m_queue);
            m_notFull.notify_one();
        }
        void Take(T &t)
        {
            unique_lock<mutex> locker(m_mutex); //
            m_notEmpty.wait(locker, [this] { return m_needStop || NotEmpty(); });
            if (m_needStop)
                return;
            t = m_queue.front();
            m_queue.pop_front();
            m_notFull.notify_one();
        }
        void Stop()
        {
            {
                lock_guard<mutex> locker(m_mutex);
                m_needStop = true;
            }
            m_notFull.notify_all(); // 将所有等待的线程全部唤醒,被唤醒的进程检查m_needStop,为真,所有的线程退出执行
            m_notEmpty.notify_all();
        }
    
    private:
        bool NotFull() const
        {
            bool full = m_queue.size() >= m_maxSize;
            if (full)
                cout << "缓冲区满了,需要等待。。。。" << endl;
            return !full;
        }
        bool NotEmpty() const
        {
            bool empty = m_queue.empty();
            if (empty)
                cout << "缓冲区空了,需要等待,。。。异步层线程: " << this_thread::get_id() << endl;
            return !empty;
        }
    
        template <typename F>
        void Add(F &&x)
        {
            unique_lock<mutex> locker(m_mutex);                                 // 通过m_mutex获得写锁
            m_notFull.wait(locker, [this] { return m_needStop || NotFull(); }); // 没有停止且满了,就释放m_mutex并waiting;有一个为真就继续执行
            if (m_needStop)
                return;
            m_queue.push_back(forward<F>(x));
            m_notEmpty.notify_one();
        }
    
    private:
        list<T> m_queue;               //缓冲区
        mutex m_mutex;                 // 互斥量
        condition_variable m_notEmpty; // 条件变量
        condition_variable m_notFull;
        int m_maxSize;   //同步队列最大的size
        bool m_needStop; // 停止标识
    };
    
    const int MaxTaskCount = 100;
    class ThreadPool
    {
    public:
        using Task = function<void()>;
        ThreadPool(int numThread = thread::hardware_concurrency())
            : m_queue(MaxTaskCount)
        {
            Start(numThread);
        }
    
        virtual ~ThreadPool()
        {
            Stop();
        }
    
        void Stop()
        {
            call_once(m_flag, [this] { StopThreadGroup(); });
        }
    
        void AddTask(Task &&task)
        {
            m_queue.Put(forward<Task>(task));
        }
    
        void AddTask(const Task &task)
        {
            m_queue.Put(task);
        }
    
    private:
        void Start(int numThreads)
        {
            m_running = true;
            //创建线程组
            for (int i = 0; i < numThreads; i++)
            {
                m_threadgroup.emplace_back(make_shared<thread>(&ThreadPool::RunInThread, this));
            }
        }
    
        // 每个线程都执行这个函数
        void RunInThread()
        {
            while (m_running)
            {
                //取任务分别执行
                list<Task> list;
                m_queue.Take(list);
                for (auto &task : list)
                {
                    if (!m_running)
                        return;
    
                    task();
                }
            }
        }
        void StopThreadGroup()
        {
            m_queue.Stop();    // 同步队列中的线程停止
            m_running = false; // 让内部线程跳出循环并推出
            for (auto thread : m_threadgroup)
            {
                if (thread)
                    thread->join();
            }
            m_threadgroup.clear();
        }
    
    private:
        list<shared_ptr<thread>> m_threadgroup; // 处理任务的线程组, 链表中存储着指向线程的共享指针
        SynaQueue<Task> m_queue;                //同步队列
        atomic_bool m_running;                  // 是否停止的标识
        once_flag m_flag;
    };
    } // namespace itstation
    ObjectPool.h
    #include <stdio.h>
    #include <iostream>
    #include "ObjectPool.h"
    #include <list>
    using namespace std;
    using namespace itstation;
    
    void TestThreadPool()
    {
        ThreadPool pool(2);
        thread thd1([&pool] {
            for (int i = 0; i < 10; i++)
            {
                auto thrID = this_thread::get_id();
                pool.AddTask([thrID, i] {cout << "同步层线程1的线程ID:" << thrID << "  这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
            }
        });
    
        thread thd2([&pool] {
            for (int i = 0; i < 10; i++)
            {
                auto thrID = this_thread::get_id();
                pool.AddTask([thrID, i] {cout << "同步层线程2的线程ID:" << thrID << "  这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
            }
        });
    
        this_thread::sleep_for(chrono::seconds(45));
        pool.Stop();
        thd1.join();
        thd2.join();
    }
    int main()
    {
        TestThreadPool();
    
        getchar();
        return 0;
    }
    test.cpp
  • 相关阅读:
    async简单使用
    node调用phantomjs-node爬取复杂页面
    mongodb3 ubuntu离线安装(非apt-get)及用户管理
    2040-亲和数(java)
    JavaScript闭包简单理解
    nodejs构建多房间简易聊天室
    linux下安装nodejs及npm
    EventBus轻松使用
    mysql用户创建及授权
    python中json的基本使用
  • 原文地址:https://www.cnblogs.com/BobHuang/p/11259471.html
Copyright © 2011-2022 走看看