zoukankan      html  css  js  c++  java
  • muduo网络库源码学习————无界队列和有界队列

    muduo库里实现了两个队列模板类:无界队列为BlockingQueue.h,有界队列为BoundedBlockingQueue.h,两个测试程序实现了生产者和消费者模型。(这里以无界队列为例,有界队列和无界的差不多)代码如下:
    BlockingQueue.h

    #include <muduo/base/Condition.h>
    #include <muduo/base/Mutex.h>
    
    #include <boost/noncopyable.hpp>
    #include <deque>
    #include <assert.h>
    
    namespace muduo
    {
    
    template<typename T>//队列模板
    class BlockingQueue : boost::noncopyable
    {
     public:
      BlockingQueue(): mutex_(), notEmpty_(mutex_),queue_()
      {//构造函数对3个成员进行初始化
      }
    
      void put(const T& x)//生产产品
      {
        MutexLockGuard lock(mutex_);//先加上锁对队列进行保护,构造函数中调用lock,析构函数会自动调用unlock
        queue_.push_back(x);//产品放进队列
        //队列不为空,通知消费者可以进行消费
        notEmpty_.notify(); // TODO: move outside of lock
      }
    
      T take()//消费产品
      {
        MutexLockGuard lock(mutex_);//加锁保护队列
        // always use a while-loop, due to spurious wakeup
        while (queue_.empty())//如果队列为空,则一直等待
        {
          notEmpty_.wait();
        }
        assert(!queue_.empty());//断言队列非空
        T front(queue_.front());//取出队首元素
        queue_.pop_front();//将队首元素弹出
        return front;//返回队首元素
      }
    
      size_t size() const//队列大小
      {
        MutexLockGuard lock(mutex_);//加锁保护
        return queue_.size();//返回队列大小
      }
    
     private:
      mutable MutexLock mutex_;//互斥锁
      Condition         notEmpty_;//条件变量
      std::deque<T>     queue_;//队列使用stl中的deque
    };
    
    }
    
    #endif  // MUDUO_BASE_BLOCKINGQUEUE_H
    

    测试代码有两个:
    BlockingQueue_test.cc

    #include <muduo/base/BlockingQueue.h>
    #include <muduo/base/CountDownLatch.h>
    #include <muduo/base/Thread.h>
    
    #include <boost/bind.hpp>
    #include <boost/ptr_container/ptr_vector.hpp>
    #include <string>
    #include <stdio.h>
    
    class Test
    {
     public://numThreads初始化为5,条件变量count初始化为5,线程个数也为5
      Test(int numThreads) : latch_(numThreads), threads_(numThreads)
      {
        for (int i = 0; i < numThreads; ++i)
        {//线程名称
          char name[32];
          snprintf(name, sizeof name, "work thread %d", i);
          //创建5个线程,threadFunc为线程回调函数
          threads_.push_back(new muduo::Thread(boost::bind(&Test::threadFunc, this), muduo::string(name)));
        }
        //启动线程
        for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
      }
    
      void run(int times)
      {
        printf("waiting for count down latch
    ");
        latch_.wait();//等待count被减为0
        printf("all threads started
    ");
        for (int i = 0; i < times; ++i)//100次
        {
          char buf[32];
          snprintf(buf, sizeof buf, "hello %d", i);
          queue_.put(buf);//往队列中添加100个产品
          //打印信息
          printf("tid=%d, put data = %s, size = %zd
    ", muduo::CurrentThread::tid(), buf, queue_.size());
        }
      }
    
      void joinAll()
      {
        for (size_t i = 0; i < threads_.size(); ++i)
        {//往5个线程添加stop
          queue_.put("stop");
        }
        //执行join
        for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
      }
    
     private:
    //线程回调函数
      void threadFunc()
      {//输出线程id和名称
        printf("tid=%d, %s started
    ", muduo::CurrentThread::tid(),muduo::CurrentThread::name());
       //计数值减一
        latch_.countDown();//count减为0时将通知所有等待线程
        bool running = true;
        while (running)
        {
          std::string d(queue_.take());//消费产品
          //打印取出的值
          printf("tid=%d, get data = %s, size = %zd
    ", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
        //直到产品的名称==stop,跳出循环
          running = (d != "stop");
        }
       //打印停止信息
        printf("tid=%d, %s stopped
    ",muduo::CurrentThread::tid(),muduo::CurrentThread::name());
      }
    
      muduo::BlockingQueue<std::string> queue_;//队列
      muduo::CountDownLatch latch_;//条件变量
      boost::ptr_vector<muduo::Thread> threads_;//线程数组
    };
    
    int main()
    {//打印进程,线程id
      printf("pid=%d, tid=%d
    ", ::getpid(), muduo::CurrentThread::tid());
      Test t(5);//定义test类
      t.run(100);
      t.joinAll();
    
      printf("number of created threads %d
    ", muduo::Thread::numCreated());
    }
    

    单独编译后运行结果如下:
    这里写图片描述

    BlockingQueue_bench.cc

    #include <muduo/base/BlockingQueue.h>
    #include <muduo/base/CountDownLatch.h>
    #include <muduo/base/Thread.h>
    #include <muduo/base/Timestamp.h>
    
    #include <boost/bind.hpp>
    #include <boost/ptr_container/ptr_vector.hpp>
    #include <map>
    #include <string>
    #include <stdio.h>
    
    class Bench//Bench是用来度量时间的一个类
    {
     public://count初始化为numThreads,创建numThreads个线程
      Bench(int numThreads) : latch_(numThreads),threads_(numThreads)
      {
        for (int i = 0; i < numThreads; ++i)
        {
          char name[32];
          snprintf(name, sizeof name, "work thread %d", i);
          //创建线程,设置回调
          threads_.push_back(new muduo::Thread( boost::bind(&Bench::threadFunc, this), muduo::string(name)));
        }
        //线程start
        for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
      }
    
      void run(int times)//生产产品
      {//10000个
        printf("waiting for count down latch
    ");
        latch_.wait();//等待count降为0
        printf("all threads started
    ");
        for (int i = 0; i < times; ++i)
        {
          muduo::Timestamp now(muduo::Timestamp::now());
          queue_.put(now);//当前时间戳进队
          usleep(1000);//1000微秒一次
        }
      }
    
      void joinAll()
      {
        for (size_t i = 0; i < threads_.size(); ++i)
        {
          queue_.put(muduo::Timestamp::invalid());//产生非法时间,即产生跳出循环的条件
        }
        for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
      }
    
     private:
    
      void threadFunc()//用于消费产品
      {
        printf("tid=%d, %s started
    ",muduo::CurrentThread::tid(),muduo::CurrentThread::name());
        //Map是STL[1]  的一个关联容器,它提供一对一
        //(其中第一个可以称为关键字,每个关键字只能在map中出现一次,第二个可能称为该关键字的值)的数据处理能力
        //第一个是delay值,第二个是相同delay的次数
        std::map<int, int> delays;//map容器
        latch_.countDown();
        bool running = true;
        while (running)
        {
          muduo::Timestamp t(queue_.take());//取出队列中的时间戳
          muduo::Timestamp now(muduo::Timestamp::now());//建立当前时间戳对象
          if (t.valid())//如果是一个合法的时间
          {//计算差值
            int delay = static_cast<int>(timeDifference(now, t) * 1000000);//以微秒为单位
            // printf("tid=%d, latency = %d us
    ", muduo::CurrentThread::tid(), delay);
            ++delays[delay];//??
          }
          running = t.valid();//t为非法的时间则跳出循环
        }
        printf("tid=%d, %s stopped
    ", muduo::CurrentThread::tid(),muduo::CurrentThread::name());
        //使用迭代器遍历map容器
        for (std::map<int, int>::iterator it = delays.begin(); it != delays.end(); ++it)
        {
          printf("tid = %d, delay = %d, count = %d
    ",muduo::CurrentThread::tid(),it->first, it->second);
        }
      }
    
      muduo::BlockingQueue<muduo::Timestamp> queue_;
      muduo::CountDownLatch latch_;
      boost::ptr_vector<muduo::Thread> threads_;
    };
    
    int main(int argc, char* argv[])
    {//若参数大于1则是传入的参数,否则设为1
      int threads = argc > 1 ? atoi(argv[1]) : 1;
    
      Bench t(threads);//建立Bench对象
      t.run(10000);
      t.joinAll();
    }
    

    单独编译后运行结构如下:(输出过长,时间也太长,截图时中断了程序)
    这里写图片描述

  • 相关阅读:
    写了一个具有future接口的rust测试代码
    lua:写了个基于协程的task调度库
    最近阅读
    电视投屏
    树莓派 系统备份
    Kindle支持的文件格式
    树莓派 more
    用google translate大文件
    NFC 大电池 高性价比手机
    rust debug之基于pdb
  • 原文地址:https://www.cnblogs.com/sigma0-/p/12630483.html
Copyright © 2011-2022 走看看