zoukankan      html  css  js  c++  java
  • Windows下一个并发阻塞队列(BlockingQueue)

    Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。

    #ifndef BLOCKINGQUEUE_H_
    #define BLOCKINGQUEUE_H_
    
    #include <queue>
    #include <windows.h>
    using namespace std;
    
    template <typename T>
    class BoundedBlockingQueue 
    { 
    public: 
        BoundedBlockingQueue(int size) : maxSize(size) 
        {
            _lock = CreateMutex(NULL,false,NULL);
            _rsem = CreateSemaphore(NULL,0,size,NULL);
            _wsem = CreateSemaphore(NULL,size,size,NULL);
        } 
        ~BoundedBlockingQueue() 
        { 
            CloseHandle(_lock);
            CloseHandle(_rsem);
            CloseHandle(_wsem);
        } 
        void push(const T& data);
        T pop();
        bool empty()
        {
            WaitForSingleObject(_lock,INFINITE);
            bool is_empty = _array.empty();
            ReleaseMutex(_lock);
            return is_empty;
        }
    private: 
        deque<T> _array;
        int maxSize;
        HANDLE _lock;
        HANDLE _rsem, _wsem;
    };
    
    template <typename T>
    void BoundedBlockingQueue <T>::push(const T& value ) 
    { 
        WaitForSingleObject(_wsem,INFINITE);
        WaitForSingleObject(_lock,INFINITE);
        _array.push_back(value);
        ReleaseMutex(_lock);
        ReleaseSemaphore(_rsem,1,NULL);
    }
    
    template <typename T>
    T BoundedBlockingQueue<T>::pop() 
    { 
        WaitForSingleObject(_rsem,INFINITE);
        WaitForSingleObject(_lock,INFINITE);
        T _temp = _array.front();
        _array.pop_front();
        ReleaseMutex(_lock);
        ReleaseSemaphore(_wsem,1,NULL);
        return _temp;
    }
    
    #endif

    主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。

    #include "BlockingQueue.h"
    #include <windows.h>
    #include <iostream>
    using namespace std;
    
    bool is_over = false;
    
    DWORD WINAPI produce(LPVOID lppara)
    {
        BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
    
        while(1)
        {
            for(int i=1; i<=50; ++i)
            {
                queue->push(i);
                cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;
                Sleep(10); //producer is fast
            }
            is_over = true;
            break;
        }
        return NULL;
    }
    
    DWORD WINAPI consume(LPVOID lppara)
    {
        BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
    
        while(1)
        {
            int d = queue->pop();
            cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;
            //double check
            if(is_over && queue->empty())
            {
                cout<<"OVER!"<<endl;
                break;
            }
            Sleep(10); //consumer is slow
        }
        return NULL;
    }
    
    int main()
    {
        DWORD write_data;
        DWORD read_data;
        DWORD read_data1;
    
        BoundedBlockingQueue<int> queue(20);
    
        //一个生产者、两个消费者
        if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)
            return -1;
        if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)
            return -1;
        if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)
            return -1;
    
        char ch;
        while(1)
        {
            ch = getchar(); //press "e" to exit
            if(ch == 'e') break;
        }
    
        printf("Program ends successfully\n");
    
        return 0;
    }
  • 相关阅读:
    [ffmpeg 扩展第三方库编译系列] 关于 mingw32 下编译libcaca
    新版本的tlplayer for android ,TigerLeapMC for windows发布了
    tlplayer 所有平台版本支持水印叠加
    更新Windows ActiveX,Ios
    Linux批量“解压”JAR文件
    Linux进阶路线
    30个实用的Linux find命令示例
    Redhat 无线(Wifi)上网命令行配置
    PLSQL developer常用技巧
    Hibernate4.3.5搭建Log4j日志环境
  • 原文地址:https://www.cnblogs.com/luxiaoxun/p/2719699.html
Copyright © 2011-2022 走看看