zoukankan      html  css  js  c++  java
  • C++ 并发消息队列

    C++ 并发消息队列

      在网上找到了一份POSIX线程显示的并发消息队列示例代码:

            http://codereview.stackexchange.com/questions/41604/thread-safe-concurrent-fifo-queue-in-c

      上面的示例代码其实是有问题的,他只能对并发Push或者并发Pop进行上锁,二并不能保证同时Push和Pop是线程安全的,所以在锁队列时只能使用一个锁。同时该代码并不支持Windows,所以按照这篇文档的思路想使用标准模板库(STL)实现一份平台无关的代码,具体实现如下所示。

      1 #include <queue>
      2 #include <mutex>
      3 #include <thread>
      4 #include <chrono>
      5 #include <memory>
      6 #include <condition_variable>
      7 
      8 typedef struct task_tag
      9 {
     10     int data;
     11     task_tag( int i ) : data(i) { }
     12 } Task, *PTask;
     13 
     14 class MessageQueue
     15 {
     16 public:
     17     MessageQueue(){}
     18     ~MessageQueue()
     19     {
     20         if ( !m_queue.empty() )
     21         {
     22             PTask pRtn = m_queue.front();
     23             delete pRtn;
     24         }
     25         
     26     }
     27 
     28     void PushTask( PTask pTask )
     29     {
     30         std::unique_lock<std::mutex> lock( m_queueMutex );
     31         m_queue.push( pTask );
     32         m_cond.notify_one();
     33     }
     34 
     35     PTask PopTask()
     36     {
     37         PTask pRtn = NULL;
     38         std::unique_lock<std::mutex> lock( m_queueMutex );
     39         while ( m_queue.empty() )
     40         {
     41             m_cond.wait_for( lock, std::chrono::seconds(1) );
     42         }
     43 
     44         if ( !m_queue.empty() )
     45         {
     46             pRtn = m_queue.front();
     47             if ( pRtn->data != 0 )
     48                 m_queue.pop();
     49         }
     50 
     51         return pRtn;
     52     }
     53 
     54 private:
     55     std::mutex m_queueMutex;
     56     std::condition_variable m_cond; 
     57     std::queue<PTask> m_queue;
     58 };
     59 
     60 void thread_fun( MessageQueue *arguments )
     61 {
     62     while ( true )
     63     {
     64         PTask data = arguments->PopTask();
     65 
     66         if (data != NULL)
     67         {
     68             printf( "Thread is: %d
    ", std::this_thread::get_id() );
     69             printf("   %d
    ", data->data );
     70             if ( 0 == data->data ) //Thread end.
     71                 break;
     72             else
     73                 delete data;
     74         }
     75     }
     76 
     77     return;
     78 }
     79 
     80  int main( int argc, char *argv[] )
     81 {
     82     MessageQueue cq;
     83 
     84     #define THREAD_NUM 3
     85     std::thread threads[THREAD_NUM];
     86 
     87     for ( int i=0; i<THREAD_NUM; ++i )
     88         threads[i] = std::thread( thread_fun, &cq );
     89 
     90     int i = 100000;
     91     while( i > 0 )
     92     {
     93         Task *pTask = new Task( --i );
     94         cq.PushTask( pTask );
     95     }
     96 
     97     for ( int i=0; i<THREAD_NUM; ++i) 
     98         threads[i].join();
     99 
    100     //system( "pause" );
    101     return 0;
    102 }

      在示例代码中,我们使主线程向公共队列cq中Push任务,而其他的线程则负责取出任务并打印任务,由于std::cout并不支持并发线程安全,所以在打印任务时使用printf。主线程new出的任务,在其他线程中使用并销毁,当主线程发送data为0的任务时,则规定任务发送完毕,而其他的线程获取到data为0的任务后退出线程,data为0的任务则有消息队列负责销毁。整个消息队列使用标准模板库实现,现实跨平台。

      在最初设计std::queue<PTask>的时候,想使用std::queue<std::shared_ptr<Task>>来管理主线程new出来的任务,这样智能指针则负责处理任务的销毁工作,但是在多线程并发的时候程序莫名的崩溃,仔细调试了半天,还是没有找到问题,最终我怀疑智能指针在多线程中是不是有问题呢?所以不得不放弃最初的设计。

  • 相关阅读:
    flask基础 MUI
    flask基础 MongoDB
    falsk 基础 语音识别与语音合成()
    flask基础 websocket ()
    flask基础四 请求上下文()
    flask基础三
    学习整理
    Elasticsearch
    课程学习:Linux系统管理
    课程学习:程序设计与算法
  • 原文地址:https://www.cnblogs.com/wanghaiyang1930/p/4668528.html
Copyright © 2011-2022 走看看