zoukankan      html  css  js  c++  java
  • 通过c++11的condition_variable实现的有最大缓存限制的队列

    之前曾写过一个通过C++11的condition_variable实现的有最大缓存限制的队列,底层使用std::queue来实现,如果想要提升性能的话,可以考虑改用固定的长度环形数组。环形数组实现如下:

    #include <cassert>
    #include <type_traits>
    #include <stdexcept>
    
    /*
     * 文件名: circle_buffer
     * 实现说明:底层使用数组来实现循环buffer
     * (1) 当m_begIdx和m_endIdx相同时,表示数组为空,否则标识数组存在值
     * (2) 通过预先多分配一个节点的方式,来实现存储count个元素的目的
     */
    
    class empty_error : public std::logic_error {
        explicit empty_error(const std::string& what_arg)
            : logic_error(what_arg)
        {}
    
        explicit empty_error(const char* what_arg)
            : logic_error(what_arg)
        {}
    
    };
    
    class full_error : public std::logic_error {
        explicit full_error(const std::string& what_arg)
           : logic_error(what_arg)
        {}
    
        explicit full_error(const char* what_arg)
            : logic_error(what_arg)
        {}
    
    };
    
    template <typename T>
    class circle_buffer {
    public:
        using size_type = size_t;
    public:
        explicit circle_buffer(size_type count)
        : m_bufSize(count+1),
          m_buf(static_cast<T*>(std::malloc(sizeof(T)*m_bufSize))),
          m_begIdx(0),
          m_endIdx(0)
        {
            assert(count >= 1);
            if (m_buf == nullptr) {
                throw std::bad_alloc();
            }
        }
    
        ~circle_buffer() {
            clear(typename std::is_trivially_destructible<T>::type());
        }
    
        size_t size() const noexcept {
            if (m_endIdx < m_begIdx) {
                return m_endIdx + m_bufSize - m_begIdx;
            } 
            return m_endIdx - m_begIdx;
        }
    
        bool empty() const noexcept {
            return m_begIdx == m_endIdx;
        }
    
        bool full() const noexcept {
            return ((m_endIdx+1) == m_begIdx) || 
                (m_begIdx == 0 && m_endIdx == getMaxIdx());
        }
    
        // buffer最后插入一个值,这里会检查是否存在空间,如果不存在,则抛出异常
        template <typename... Args>
        void pushCheck(Args&&... args) {
            if (full()) {
                throw full_error("pushCheck invoked when buffer is full");
            } 
    
            push(std::forward<Args>(args)...);
        }
        
        // buffer最后插入一个值,这里不做检查是否存在空间
        template <typename... Args>
        void push(Args&&... args) {
            new (&m_buf[m_endIdx]) T(std::forward<Args>(args)...);
            advanceIdx(m_endIdx);
        }
    
        // buffer最前面取出一个值,这里会检查是否存在元素可以取出,如果不存在,则抛出异常
        T popCheck() {
            if (empty()) {
                throw empty_error("popCheck invoked when buffer is empty");
            }
    
            return pop();
        }
    
        // buffer最前面取出一个值
        T pop() {
            auto val = std::move(m_buf[m_begIdx]);
            clearOne(typename std::is_trivially_destructible<T>::type());
            advanceIdx(m_begIdx);
            return val;
        }
    
    
    private:
        // 将指示位置的序号前进一格
        void advanceIdx(size_t& idx) noexcept {
            if (idx == getMaxIdx()) {
                idx = 0;
            } else {
                ++idx;
            }
        }
    
        // 非trivially析构函数类型
        void clear(std::false_type) {
            while (m_begIdx != m_endIdx) {
                m_buf[m_begIdx].~T();
                advanceIdx(m_begIdx);
            }
            std::free(m_buf);
        }
    
        // trivially析构函数类型
        void clear(std::true_type) {
            std::free(m_buf);
        }
    
        // 非trivially析构函数类型
        void clearOne(std::false_type) {
            m_buf[m_begIdx].~T();
        }
    
        // trivially 析构函数类型
        void clearOne(std::true_type) {
        }
    
        size_t getMaxIdx() const noexcept {
            return m_bufSize-1;
        }
    
    private:
        size_type m_bufSize;
        T* m_buf;
        size_type m_begIdx;
        size_type m_endIdx;
    };

    关于上面的环形数组,简单的单元测试代码如下,这里使用了catch2,如下代码需要放在.cpp文件中。

    #define CATCH_CONFIG_MAIN       
    // This tells Catch to provide a main() - only do this in one cpp file
    #include "catch.hpp"
    #include "circle_buffer.h"
    
    TEST_CASE("circle buffer manipulation", "[circle_buffer]") {
        circle_buffer<int> cb(2);
        
        REQUIRE( cb.size() == 0 );
        REQUIRE( cb.empty() == true);
        REQUIRE( cb.full() == false);
    
        cb.push(5);
        cb.push(6);
    
        REQUIRE( cb.size() == 2 );
        REQUIRE( cb.empty() == false );
        REQUIRE( cb.full() == true );
    
        auto dropFirst = cb.pop();
        
        REQUIRE( dropFirst == 5 );
        REQUIRE( cb.size() == 1 );
        REQUIRE( cb.empty() == false );
        REQUIRE( cb.full() == false );
    
        cb.push(9);
    
        REQUIRE( cb.size() == 2 );
        REQUIRE( cb.empty() == false );
        REQUIRE( cb.full() == true);
    
        auto dropSecond = cb.pop();
    
        REQUIRE( dropSecond == 6 );
        REQUIRE( cb.size() == 1 );
        REQUIRE( cb.empty() == false );
        REQUIRE( cb.full() == false );
    
        auto dropThird = cb.pop();
    
        REQUIRE( dropThird == 9 );
        REQUIRE( cb.size() == 0 );
        REQUIRE( cb.empty() == true );
        REQUIRE( cb.full() == false );
    }

    下面是基于环形数组实现的有最大长度限制的生产者消费者队列,注意一点,在使用下面队列时,编译选项要加上-std=c++11。

    #include <condition_variable>
    #include <chrono>
    #include "circle_buffer.h"
    
    template <typename T>
    class producer_consumer_queue {
    public:
        producer_consumer_queue(int maxSize): m_buffer(maxSize) { }
    
        // 处理数据线程
        T readQueue() {
            T data;
            // 取出数据,然后处理数据
            {
                std::unique_lock<std::mutex> lock(m_queueMtx);
                m_consumeCv.wait(lock, [this] { return !m_buffer.empty(); });
    
                data = m_buffer.pop();
            }
            m_produceCv.notify_one();
    
            return data;
        }
    
        // 生产数据线程,返回值表示是否生产成功,如果超时就不会生产成功
        template <typename Rep, typename Period, typename ...Args>
        bool writeQueue(const std::chrono::duration<Rep, Period>& wait_time, Args&& ...args) {
            // 预设一个消费者处理这个数据
            {
                std::unique_lock<std::mutex> lock(m_queueMtx);
                auto success = m_produceCv.wait_for(lock, wait_time, [this] { return !m_buffer.full(); });
                if (!success) {
                    return false;
                }
                m_buffer.push(std::forward<Args>(args)...);
            }
            m_consumeCv.notify_one();
            return true;
        }
    
    private:
        // 用来缓存数据
        circle_buffer<T> m_buffer;
        // 用来保护数据
        std::mutex m_queueMtx;
        // 用来提醒当前可以消费
        std::condition_variable m_consumeCv;
        // 用来提醒当前可以生产
        std::condition_variable m_produceCv;
    };

    以上就是这个队列的具体实现。之后,考虑写一些关于中间件的知识,可能会从grpc开始吧。

  • 相关阅读:
    到底什么时候才需要在ObjC的Block中使用weakSelf/strongSelf
    陀螺仪、加速计和磁力计
    UIImage加载图片的方式以及Images.xcassets对于加载方法的影响
    Java-Jdbc
    3.1 基本数据类型
    第三章 数据类型和变量
    2.2.4 给java应用打包
    2.2.3 运行java程序
    2.2.2 编译java源文件
    2.2.1 jdk简介
  • 原文地址:https://www.cnblogs.com/albizzia/p/10777174.html
Copyright © 2011-2022 走看看