zoukankan      html  css  js  c++  java
  • ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)

    參考这里用到了线程管理。參考:http://blog.csdn.net/calmreason/article/details/36399697

    以下的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。

    此程序在控制台不停的输出递增数字,主要是内存不会泄露

    用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等

    #include <iostream>
    using namespace std;
    #include "boost/lexical_cast.hpp"
    using namespace boost;
    #include "ace/Thread_Manager.h" 
    #include "ace/Message_Queue.h"
    
    void* create_vairous_record(void* ace_message_queue);
    
    void* get_vairous_record(void* ace_message_queue);
    
    int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) 
    {
    
    	ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;
    
    	ACE_Thread_Manager::instance()->spawn(
    		ACE_THR_FUNC(create_vairous_record), 
    		various_record_queue, 
    		THR_NEW_LWP | THR_DETACHED);
    
    	ACE_Thread_Manager::instance()->spawn(
    		ACE_THR_FUNC(get_vairous_record), 
    		various_record_queue, 
    		THR_NEW_LWP | THR_DETACHED);
    
    	ACE_Thread_Manager::instance()->wait();
    
    	return 0;
    }
    
    void* create_vairous_record(void* ace_message_queue)
    {
    
    	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
    	int i=0;
    	while (i<10000000)
    	{
    		ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息
    		string temp = lexical_cast<string>(++i);
    		mbl->copy(temp.c_str());
    		p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体)
    	}
    	return nullptr;
    }
    
    void* get_vairous_record(void* ace_message_queue)
    {
    
    	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
    	while (true)
    	{
    		ACE_Message_Block* mbl =nullptr;
    		p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放
    		if (mbl)
    		{
    			cout<<mbl->rd_ptr()<<endl;
    			mbl->release();//消息已经用完。释放消息
    		}
    	}
    	return nullptr;
    
    }

    以下的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误

    #include <iostream>
    #include <bitset>
    #include <vector>
    #include <memory>
    using namespace std;
    
    #include "ace/Thread_Manager.h" 
    #include "ace/Message_Queue.h"
    #include "ace/Message_Block.h"
    #include "ace/Task.h"
    #include "ace/OS.h"
    
    namespace global
    {
        const int total_number = 1000000;
        int task_number = 2;
        typedef int number_type;
    }
    
    class Generator_Number : public ACE_Task<ACE_MT_SYNCH>
    {
    public:
        Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i);
        virtual int open(void *args  = 0 );
        ~Generator_Number(void);
    protected:
        Generator_Number(const Generator_Number&);
        Generator_Number& operator=(const Generator_Number&);
    private:
        int svc(void);
        int mod_i_;
    };
    
    Generator_Number::Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i):mod_i_(i)
    {
        this->msg_queue(msgq);
        std::cout<<"Generator_Number(const int "<<i<<")"<<std::endl;
    }
    
    int Generator_Number::open(void *args )
    {
        return this->activate(THR_NEW_LWP | THR_DETACHED);
    }
    
    int Generator_Number::svc(void)
    {
        std::cout<<"Generator_Number("<<this->mod_i_<<")::svc()"<<std::endl;
        for (size_t i = this->mod_i_ ; i<global::total_number;i+=global::task_number)
        {
            ACE_Message_Block * blk = new ACE_Message_Block(20);
            blk->copy(reinterpret_cast<const char*>(&i),sizeof(global::number_type));
            this->msg_queue()->enqueue_tail(blk);
        }
        return 0;
    }
    
    Generator_Number::~Generator_Number(void)
    {
        std::cout<<"~Generator_Number("<<this->mod_i_<<")"<<std::endl;
    }
    
    void* out_put_queue(void* all_numbers_queue1)
    {
        ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)all_numbers_queue1;
        bitset<global::total_number> all_number_bitset;
        size_t count_got_message=0;
        while(true)
        {
            if(!all_numbers_queue->is_empty())
            {
                ACE_Message_Block* blk = 0;
                all_numbers_queue->dequeue_head(blk);
                all_number_bitset.set(*reinterpret_cast<global::number_type*>(blk->rd_ptr()));
                blk->release();
                if(++count_got_message == global::total_number)
                {
                    break;
                }
            }
            else
            {
                std::cout<<"now sleep 1"<<std::endl;
                ACE_Time_Value t(0,3000);
                ACE_OS::sleep(t);
            }
        }
        global::number_type check =0;
        bool wright_flag = true;
        for (size_t j=0; j!= global::total_number;++j)
        {
            if (0 == all_number_bitset[j])
            {
                wright_flag = false;
                break;
            }
        }
        std::cout<<std::endl;
        std::cout<<"check result:"<<wright_flag<<std::endl;
        return 0;
    }
    #include "boost/timer.hpp"
    using namespace boost;
    
    int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) 
    {
        cout<<"total_number:"<<global::total_number<<endl;
        timer t;
        ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;
    
        vector<shared_ptr<Generator_Number>> gener_array;
    
        for (int i=0;i<global::task_number;++i)
        {
            gener_array.push_back(shared_ptr<Generator_Number>(new Generator_Number(all_numbers_queue,i)));
        }
        for (vector<shared_ptr<Generator_Number>>::const_iterator citer = gener_array.cbegin();
            citer!=gener_array.cend();
            ++citer)
        {
            (*citer)->open();
        }
    
        ACE_Thread_Manager::instance()->spawn(
            ACE_THR_FUNC(out_put_queue),   
            all_numbers_queue,   
            THR_NEW_LWP | THR_DETACHED);
    
        ACE_Thread_Manager::instance()->wait();
        cout<<t.elapsed()<<"s"<<endl;
        return 0;
    }
    
    
    
    输出例如以下:

    total_number:1000000
    Generator_Number(const int 0)
    Generator_Number(const int 1)
    Generator_Number(0)::svc()
    Generator_Number(1now sleep 1
    )::svc()
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1
    now sleep 1


    check result:1
    0.944s
    ~Generator_Number(0)
    ~Generator_Number(1)
    请按随意键继续. . .

    ACE_Message_Queue

    高水位低水位

    http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/

    注意事项

    http://blog.chinaunix.net/uid-20453737-id-37118.html



  • 相关阅读:
    Apply Custom Filter on Lookup Field in PowerApps using Script
    Lookup and Search Views in Model-Driven Apps
    Add an embedded canvas app on a model-driven form
    Set up powerapps to use SharePoint Online
    SharePoint as document management storage for Dynamics CRM
    Lookup Field in collect
    Filter Search Lookup in PowerApps
    产品面试-谈谈你最喜欢的APP--知乎
    什么是需求,怎么做需求分析?怎么管理需求?产品经理必知必会
    ukey登录方案
  • 原文地址:https://www.cnblogs.com/yxwkf/p/5418932.html
Copyright © 2011-2022 走看看