zoukankan      html  css  js  c++  java
  • boost::lockfree::queue多线程读写实例

    最近的任务是写一个多线程的东西,就得接触多线程队列了,我反正是没学过分布式的,代码全凭感觉写出来的,不过运气好,代码能够work= =

    话不多说,直接给代码吧,一个多消费者,多生产者的模式。假设我的任务是求队列的中位数是啥,每消费10000次的时候,我要知道中位数是什么。

    至于加不加锁,这个看你了,我反正是加了,代码里面没写……我反正是把写的代码单独封装了一个函数,然后加了个锁

    欢迎交流,这个代码已经在实际任务上面上线了,希望不会有bug。

    用的是boost::lockfree::queue,官方文档:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hpp

    /*
    关于锁的代码:
    
    伟大的Boost库给我们提供了 shared_mutex  类,结合 unique_lock 与 shared_lock 的使用,可以实现读写锁。
    
    
    
    通常读写锁需要完成以下功能:
    
    1.当 data 被线程A读取时,其他线程仍可以进行读取却不能写入
    
    2.当 data 被线程A写入时,其他线程既不能读取也不能写入
    
    
    
    对应于功能1,2我们可以这样来描述:
    
    1.当线程A获得共享锁时,其他线程仍可以获得共享锁但不能获得独占锁
    
    2.当线程A获得独占锁时,其他线程既不能获得共享锁也不能获得独占锁
    
    typedef boost::shared_lock<boost::shared_mutex> read_lock;  
    typedef boost::unique_lock<boost::shared_mutex> write_lock;  
      
    boost::shared_mutex read_write_mutex;  
    int32_t data = 1;  
      
    //线程A,读data  
    {  
        read_lock rlock(read_write_mutex);  
        std::cout << data << std:; endl;  
    }  
      
    //线程B,读data  
    {  
        read_lock rlock(read_write_mutex);  
        std::cout << data << std:; endl;  
    }  
      
    //线程C,写data  
    {  
        write_lock rlock(read_write_mutex);  
        data = 2;  
    }  
    */
    
    
    
    #ifndef DYNAMIC_QUEUE_H_
    #define DYNAMIC_QUEUE_H_
    
    #include "boost/lockfree/queue.hpp"
    #include "boost/thread/thread.hpp"
    #include "boost/thread/mutex.hpp"
    #include "abtest_parameters.h"
    
    namespace un {
    class DynamicController {
    
    public:
    boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue;
    // boost::lockfree::queue  boost里面的无锁队列,唯一比较蛋疼的就是空间最大65536以及没法输出size,其他的就将就用吧。
    // 队列长度可以自定义,也可以不定义,会自增长的。
    
    size_t num = 0;
    
    void StartDaemonUpdater(){
      boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this);
      boost::thread thrd(f);
      thrd.detach();
    }
    // 启动消费者队列
    
    void Producer(size_t number){
      bool succ = lockfree_queue.bounded_push(number);
      // 如果用push的话,没空间的话,会等待消费完。
      // bounded_push的话,如果每空间会返回false,然后弃掉这个数。成功返回true
    }
    // 生产者
    
    size_t GetNumber(
      return num;
    }
    // get代码
    
    void UpdaterWorker(void){
      std::vector<size_t> V;
      while(1){//稳妥起见,这个while里面可以写个sleep以至于不需要一直在消费。
        size_t tmp_value;
        while(lockfree_queue.pop(tmp_value)){
          V.push_back(tmp_value);
          // 更新条件,10000个数
          // 用p99更新
          if(V.size()>10000){
            std::sort(V.begin(),V.end());
            num = V[size_t(V.size()*0.5)];
            V.clear();
          }
        }
      }
    }
    
    // 消费者
    
    };
    }
    #endif
  • 相关阅读:
    Android(java)学习笔记68:使用proguard混淆android代码
    SGU 194 Reactor Cooling
    关于流量有上下界的网络流问题的求解
    关于最小割的求解方法
    HDU 5311 Hidden String
    POJ 3548 Restoring the digits
    POJ 2062 HDU 1528 ZOJ 2223 Card Game Cheater
    ZOJ 1967 POJ 2570 Fiber Network
    HDU 1969 Pie
    HDU 1956 POJ 1637 Sightseeing tour
  • 原文地址:https://www.cnblogs.com/qscqesze/p/8323777.html
Copyright © 2011-2022 走看看