zoukankan      html  css  js  c++  java
  • 基于boost 线程并行技术实现消息队列方式[记录]

    #include <queue>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/thread.hpp>
    #include <boost/thread/tss.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
    #include <boost/date_time/posix_time/posix_time.hpp>


    #include <iostream>
    using namespace std;
    using namespace boost;


    boost::recursive_mutex io_mutex;
    boost::condition_variable_any cond;
    std::queue<int>  iq;

    class printer
    {
    public:
        printer(boost::asio::io_service& io,int n)
            : strand_(io),
            timer1_(io, boost::posix_time::seconds(1)),
            timer2_(io, boost::posix_time::seconds(1)),
            count_(n)
        {
            timer2_.async_wait(boost::bind(&printer::enqueue, this));
            timer1_.async_wait(boost::bind(&printer::dequeue, this));
           
        }

        ~printer()
        {
             boost::recursive_mutex::scoped_lock  lock(io_mutex);
            std::cout << "Final count is " << count_ << "\n";
        }

        void dequeue()
        {
            boost::recursive_mutex::scoped_lock  lock(io_mutex);
            while(iq.empty())
            {
              cond.wait(lock);       
           
            }
            int pop =0;
            if (!iq.empty())
            {           
                pop = iq.front();
                iq.pop();
                cout<<"------------pop "<<pop<<endl;
            }
            cond.notify_all();
          
            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(boost::bind(&printer::dequeue, this));       
        }

        void enqueue()
        {
            boost::recursive_mutex::scoped_lock  lock(io_mutex);        

            while(iq.size() == 1000)
            {
              cond.wait(lock);           
            }

            iq.push(count_);;
            cond.notify_all();
            cout<<"------------push "<<count_<<endl;
            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(boost::bind(&printer::enqueue, this));          
        }

    private:
        boost::asio::strand strand_;
        boost::asio::deadline_timer timer1_;
        boost::asio::deadline_timer timer2_;
        int count_;
    };


    int main()
    {
        thread_group threads;
        boost::asio::io_service io;
        

        printer * p[1000];
        int num = 1000;
        for(int i = 0; i < num; i++)
        {
            p[i] = new printer(io,i);

            threads.create_thread(boost::bind(&boost::asio::io_service::run, &io));
        }

       
        io.run();   
        threads.join_all();
       

        std::system("pause");
        return 0;
    }
  • 相关阅读:
    Netty事件监听和处理(上)
    Netty事件监听和处理(上)
    Netty事件监听和处理(上)
    nginx 反向代理转发导致css,js,图片失效
    nginx 反向代理转发导致css,js,图片失效
    nginx 反向代理转发导致css,js,图片失效
    【leetcode】507.Perfect Number
    【leetcode】507.Perfect Number
    【leetcode】507.Perfect Number
    Javascript中的类实现
  • 原文地址:https://www.cnblogs.com/toosuo/p/2459764.html
Copyright © 2011-2022 走看看