zoukankan      html  css  js  c++  java
  • mempool

    我刚才在想两个线程同时访问一个函数有没有问题
    看到这个我就放心了
    其实你可以这样想,函数本身只是代码,代码是只读的,无论多少个线程同时调都无所谓(因为只读嘛)。但是函数里面总要用到数据,
    如果数据属于线程(比如函数参数、局部变量,存在栈上,每个线程都有自己的栈),那么同时调还是没关系,因为用的本线程的数据;但是如果用了一些全局数据,比如全局变量,同时操作一个数据结构(如对一个链表有什么操作),那就不行了,这时候锁就出来了。
    
    ///////////////////////////////////////////////////////////////
    //
    // I_DataLayer.h                   
    // Description:    
    //    
    ///////////////////////////////////////////////////////////////
    #pragma once
    #include <plug/plug.h>               
    //------------------------------------------------------------------------------------------------------------------
    
    //#if !defined(...)
    #ifndef MPMC_MEM_POOL
    #define MPMC_MEM_POOL
    #endif
    
    #ifdef MPMC_MEM_POOL //多生产者多消费者的多线程模式(支持三个或三个以上线程)
    #include <boost/lockfree/queue.hpp>
    #else //单生产者单消费者模式(支持两个线程)
    #include <boost/lockfree/spsc_queue.hpp>
    #endif
    
    #include <boost/thread.hpp>
    #include <boost/atomic.hpp>
    
    struct mempkt
    {
        int size;
        void* data;
    };
    
    class memPool
    {
    public:
        memPool();
        ~memPool();
    
    private:
    
    #ifdef MPMC_MEM_POOL
        typedef boost::lockfree::queue<mempkt, boost::lockfree::capacity<10000> >  memqueue;
    #else
        typedef boost::lockfree::spsc_queue<mempkt, boost::lockfree::capacity<10000> >  memqueue;
    #endif
    
        void* m_mem_buffer100;
        memqueue m_mempkt100;
    
        void* m_mem_buffer1000;
        memqueue m_mempkt1000;
    
        boost::atomic_int m_buffer100_num;
        boost::atomic_int m_buffer1000_num;  
    public:
        inline void* popPkt(int len)
        {
            if(len < 100)
            {
                void* data = popPkt2(m_mempkt100, len);
                --m_buffer100_num;
                return data;
            }
            if(len < 1000)
            {
                void* data = popPkt2(m_mempkt1000, len);
                --m_buffer1000_num;
                return data;
            }
            return new char[len];//不再范围内直接自己分配
        }
        void* popPkt2(memqueue& que, int len)
        {
            while(1)
            {
                if(que.empty())//说明队列中都被取完了,等待别人归还
                {
                    boost::this_thread::interruptible_wait(1);
                    continue;
                }
                mempkt pkt;
                if(!que.pop(pkt))
                    continue;
                return pkt.data;
            }
            return nullptr;
        }
        inline void pushPkt(void* data, int len)
        {
            if(len < 100)
            {
                mempkt pkt = {len, data};
                if(m_mempkt100.push(pkt))
                {
                    ++m_buffer100_num;
                    return;
                }
                if(m_buffer100_num > 9999)//说明队列已经满了,可以等别人来取
                    boost::this_thread::interruptible_wait(1);
            }
            else if(len < 1000)
            {
                mempkt pkt = {len, data};
                if(m_mempkt1000.push(pkt))
                {
                    ++m_buffer1000_num;
                    return;
                }
                if(m_buffer1000_num > 9999)
                    boost::this_thread::interruptible_wait(1);
            }
            delete[] data;
        }
    };
    
    inline memPool::memPool()
    {
        m_buffer100_num = 0;
        m_buffer1000_num = 0;  
    
        m_mem_buffer100 = new char[1024 * 1024 * 2];
    
        for (int i = 0; i < 10000; i++)//将分配的连续内存分成10000份,每份是100字节
        {
            mempkt pkt;
            pkt.data = (char*)m_mem_buffer100 + (i * 100);//根据指针的移动
            pkt.size = 100;
            m_mempkt100.push(pkt);
        }
    
        m_mem_buffer1000 = new char[1024 * 1024 * 15];
    
        for (int i = 0; i < 10000; i++)//将分配的连续内存分成10000份,每份是1000字节
        {
            mempkt pkt;
            pkt.data = (char*)m_mem_buffer1000 + (i * 1000);
            pkt.size = 1000;
            m_mempkt1000.push(pkt);
        }
    }
    
    inline memPool::~memPool()
    {
    }
    
    
    #include "mempool.h"
    #include <boost/lockfree/spsc_queue.hpp>
    #include <thread>
    using namespace std;
    
    /*
    基本思想是分配两块内存,然后将分别将其分割到一个无锁队列中
    然后一个线程从里面pop,一个从里面push
    注意这里面的线程安全,里面用了原子的int,和无锁,不然会造成不安全
    */
    struct pakcet
    {
        void* data;
        int len;
    };
    
    boost::lockfree::spsc_queue<pakcet, boost::lockfree::capacity<22000> > allpakcet;//所有的包存储在allpakcet,单生产者,单消费者模式
    int num = 0;
    memPool *p = new memPool;
    
    void getpakcet()
    {
        while(1)
        {
            if(allpakcet.empty())
            {
                Sleep(1);
                continue;
            }
            pakcet pa;
            allpakcet.pop(pa);
            int num = *(int*)pa.data;
            p->pushPkt(pa.data, pa.len);//同时访问代码段,然后每个线程有调用堆栈,所以多个线程
            std::cout << num << std::endl;
        }
    }
    int main()
    {
        //线程专门负责取数据
        std::thread th(getpakcet);
        //memPool p;
        //主线程
        while(1)
        {
            pakcet pa;
            pa.data = p->popPkt(sizeof(num));
            *(int*)pa.data = num++;
            pa.len = 4;
            while(1)
            {
                if(allpakcet.push(pa))
                    break;
                Sleep(1);
            }
        }
        getchar();
        return 0;
    }
  • 相关阅读:
    英语apyrite红碧玺apyrite单词
    英语SouthRedAgate南红玛瑙
    英语kutnahorite金田黄kutnahorite单词
    英语chalchite蓝绿松石chalchite单词
    单词demantoite翠榴石demantoite英语
    英语fieldyellowstone田黄石fieldyellowstone单词
    Http通讯Util
    redis分布式锁工具类
    永不重复的id生成器
    二维码生成工具类
  • 原文地址:https://www.cnblogs.com/zzyoucan/p/3974810.html
Copyright © 2011-2022 走看看