zoukankan      html  css  js  c++  java
  • Boost lockfree deque 生产者与消费者多对多线程应用

      boost库中有一个boost::lockfree::queue类型的 队列,对于一般的需要队列的程序,其效率都算不错的了,下面使用一个用例来说明。

      程序是一个典型的生产者与消费者的关系,都可以使用多线程,其效率要比使用上层的互斥锁要快很多,因为它直接使用底层的原子操作来进行同步数据的。

      freedeque.h

      1 #pragma once#ifndef INCLUDED_UTILS_LFRINGQUEUE  
      2 #define INCLUDED_UTILS_LFRINGQUEUE  
      3 
      4 #define _ENABLE_ATOMIC_ALIGNMENT_FIX  
      5 #define ATOMIC_FLAG_INIT 0  
      6 
      7 
      8 #pragma once  
      9 
     10 
     11 #include <vector>  
     12 #include <mutex>  
     13 #include <thread>  
     14 #include <atomic>  
     15 #include <chrono>  
     16 #include <cstring>  
     17 #include <iostream>  
     18 
     19 // Lock free ring queue   
     20 
     21 template < typename _TyData, long _uiCount = 100000 >
     22 class lfringqueue
     23 {
     24 public:
     25     lfringqueue(long uiCount = _uiCount) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount(uiCount)
     26     {
     27         m_queue = new _TyData*[m_uiCount];
     28         memset(m_queue, 0, sizeof(_TyData*) * m_uiCount);
     29     }
     30 
     31     ~lfringqueue()
     32     {
     33         if (m_queue)
     34             delete[] m_queue;
     35     }
     36 
     37     bool enqueue(_TyData *pdata, unsigned int uiRetries = 1000)
     38     {
     39         if (NULL == pdata)
     40         {
     41             // Null enqueues are not allowed  
     42             return false;
     43         }
     44 
     45         unsigned int uiCurrRetries = 0;
     46         while (uiCurrRetries < uiRetries)
     47         {
     48             // Release fence in order to prevent memory reordering   
     49             // of any read or write with following write  
     50             std::atomic_thread_fence(std::memory_order_release);
     51 
     52             long lHeadIterator = m_lHeadIterator;
     53 
     54             if (NULL == m_queue[lHeadIterator])
     55             {
     56                 long lHeadIteratorOrig = lHeadIterator;
     57 
     58                 ++lHeadIterator;
     59                 if (lHeadIterator >= m_uiCount)
     60                     lHeadIterator = 0;
     61 
     62                 // Don't worry if this CAS fails.  It only means some thread else has  
     63                 // already inserted an item and set it.  
     64                 if (std::atomic_compare_exchange_strong(&m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator))
     65                 {
     66                     // void* are always atomic (you wont set a partial pointer).  
     67                     m_queue[lHeadIteratorOrig] = pdata;
     68 
     69                     if (m_lEventSet.test_and_set())
     70                     {
     71                         m_bHasItem.test_and_set();
     72                     }
     73                     return true;
     74                 }
     75             }
     76             else
     77             {
     78                 // The queue is full.  Spin a few times to check to see if an item is popped off.  
     79                 ++uiCurrRetries;
     80             }
     81         }
     82         return false;
     83     }
     84 
     85     bool dequeue(_TyData **ppdata)
     86     {
     87         if (!ppdata)
     88         {
     89             // Null dequeues are not allowed!  
     90             return false;
     91         }
     92 
     93         bool bDone = false;
     94         bool bCheckQueue = true;
     95 
     96         while (!bDone)
     97         {
     98             // Acquire fence in order to prevent memory reordering   
     99             // of any read or write with following read  
    100             std::atomic_thread_fence(std::memory_order_acquire);
    101             //MemoryBarrier();  
    102             long lTailIterator = m_lTailIterator;
    103             _TyData *pdata = m_queue[lTailIterator];
    104             //volatile _TyData *pdata = m_queue[lTailIterator];              
    105             if (NULL != pdata)
    106             {
    107                 bCheckQueue = true;
    108                 long lTailIteratorOrig = lTailIterator;
    109 
    110                 ++lTailIterator;
    111                 if (lTailIterator >= m_uiCount)
    112                     lTailIterator = 0;
    113 
    114                 //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig ))  
    115                 if (std::atomic_compare_exchange_strong(&m_lTailIterator, &lTailIteratorOrig, lTailIterator))
    116                 {
    117                     // Sets of sizeof(void*) are always atomic (you wont set a partial pointer).  
    118                     m_queue[lTailIteratorOrig] = NULL;
    119 
    120                     // Gets of sizeof(void*) are always atomic (you wont get a partial pointer).  
    121                     *ppdata = (_TyData*)pdata;
    122 
    123                     return true;
    124                 }
    125             }
    126             else
    127             {
    128                 bDone = true;
    129                 m_lEventSet.clear();
    130             }
    131         }
    132         *ppdata = NULL;
    133         return false;
    134     }
    135 
    136 
    137     long countguess() const
    138     {
    139         long lCount = trycount();
    140 
    141         if (0 != lCount)
    142             return lCount;
    143 
    144         // If the queue is full then the item right before the tail item will be valid.  If it  
    145         // is empty then the item should be set to NULL.  
    146         long lLastInsert = m_lTailIterator - 1;
    147         if (lLastInsert < 0)
    148             lLastInsert = m_uiCount - 1;
    149 
    150         _TyData *pdata = m_queue[lLastInsert];
    151         if (pdata != NULL)
    152             return m_uiCount;
    153 
    154         return 0;
    155     }
    156 
    157     long getmaxsize() const
    158     {
    159         return m_uiCount;
    160     }
    161 
    162     bool HasItem()
    163     {
    164         return m_bHasItem.test_and_set();
    165     }
    166 
    167     void SetItemFlagBack()
    168     {
    169         m_bHasItem.clear();
    170     }
    171 
    172 private:
    173     long trycount() const
    174     {
    175         long lHeadIterator = m_lHeadIterator;
    176         long lTailIterator = m_lTailIterator;
    177 
    178         if (lTailIterator > lHeadIterator)
    179             return m_uiCount - lTailIterator + lHeadIterator;
    180 
    181         // This has a bug where it returns 0 if the queue is full.  
    182         return lHeadIterator - lTailIterator;
    183     }
    184 
    185 private:
    186     std::atomic<long> m_lHeadIterator;  // enqueue index  
    187     std::atomic<long> m_lTailIterator;  // dequeue index  
    188     _TyData **m_queue;                  // array of pointers to the data  
    189     long m_uiCount;                     // size of the array  
    190     std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT;       // a flag to use whether we should change the item flag  
    191     std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT;        // a flag to indicate whether there is an item enqueued  
    192 };
    193 
    194 #endif //INCLUDED_UTILS_LFRINGQUEUE  

      

    /*
    * File:   main.cpp
    * Author: Peng
    *
    * Created on February 22, 2014, 9:55 PM
    */
    #include <iostream> 
    #include <string>  
    #include "freedeque.h" 
    #include <sstream>  
    #include <boost/thread/thread.hpp>  
    #include <boost/lockfree/queue.hpp>    
    #include <boost/atomic.hpp>  
    #include<boost/thread/lock_guard.hpp>
    #include<boost/thread/mutex.hpp>
    #include<boost/date_time/posix_time/posix_time.hpp>
    
    const int NUM_ENQUEUE_THREAD = 5;
    const int NUM_DEQUEUE_THREAD = 10;
    const long NUM_ITEM = 50000;
    const long NUM_DATA = NUM_ENQUEUE_THREAD * NUM_ITEM;
    
    class Data {
    public:
    	Data(int i = 0) : m_iData(i)
    	{
    		std::stringstream ss;
    		ss << i;
    		m_szDataString = ss.str();    
    	}
    
    	bool operator< (const Data & aData) const
    	{
    		if (m_iData < aData.m_iData)
    			return true;
    		else
    			return false;
    	}
    
    	int& GetData()
    	{
    		return m_iData;
    	}
    private:
    	int m_iData;
    	std::string m_szDataString;
    };
    
    Data* g_arrData = new Data[NUM_DATA];
    boost::mutex mtx;
    
    constexpr long size = 0.5 * NUM_DATA;
    lfringqueue < Data, 10000> LockFreeQueue;
    boost::lockfree::queue<Data*> BoostQueue(10000);
    
    bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int n)
    {
    	for (long i = 0; i < NUM_ITEM; i++)
    	{
    		int x = i + NUM_ITEM * n;
    		Data* pData = g_arrData + x;
    		LockFreeQueue.enqueue(pData);	
    	}
    	return true;
    }
    
    
    
    void print(Data* pData) {
    	if (!pData)
    		return;
    
    	boost::lock_guard<boost::mutex> lock(mtx);
    
    	std::cout << pData->GetData() << std::endl;
    	
    }
    
    bool Dequeue()
    {
    	Data *pData = NULL;
    	
    	while (true)
    	{
    		if (LockFreeQueue.dequeue(&pData) && pData)
    		{
    			print(pData);
    		}
    		else {
    			boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(5));
    		}
    	}
    
    	return true;
    }
    
    int main(int argc, char** argv)
    {
    	for (int i = 0; i < NUM_DATA; ++i)
    	{
    		Data data(i);
    		//DataArray[i] = data;
    		*(g_arrData + i) = data;
    	}
    
    	std::thread PublishThread[NUM_ENQUEUE_THREAD];
    	std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
    	std::chrono::duration<double> elapsed_seconds;
    
    	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
    	{
    		PublishThread[i] = std::thread(GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i);
    	}
    
    	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
    	{
    		ConsumerThread[i] = std::thread{ Dequeue };
    	}
    
    	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
    	{
    		ConsumerThread[i].join();
    	}
    
    	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
    	{
    		PublishThread[i].join();
    	}
    
    	delete[] g_arrData;
    	return 0;
    }
    

      说明:模板文件是原作者写的,为了验证其正确性,后面的测试程序我改写了一下,最后测试程序是无法退出来的,这里只是测试,没有进一步完善了。

      在测试中发现deque应该是大小限制的,再增大data的数据程序会阻塞在某个地方没有进一步再查找原因了,以后有时候再做修改,对于一般的工程都够用了。

  • 相关阅读:
    链家网各城市二手房价格
    mitmproxy 配置
    Python操作APP -- Appium-Python-Client
    Appium连接模拟器
    adb server version (xx) doesn't match this client (xx); killing...
    Appnium 环境搭建
    KeyError: 'xxx does not support field: _id'
    Python执行JS -- PyExecJS库
    Python -- CSV文件读写
    Git的基本使用 -- 分支管理
  • 原文地址:https://www.cnblogs.com/hbright/p/9508032.html
Copyright © 2011-2022 走看看