zoukankan      html  css  js  c++  java
  • Muduo网络库源代码分析(四)EventLoopThread和EventLoopThreadPool的封装

    muduo的并发模型为one loop per thread+ threadpool。为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。

    所以这篇博文并没有涉及到新奇的技术。可是也有一些封装和逻辑方面的注意点须要我们去分析和理解。

    EventLoopThread

    不论什么一个线程,仅仅要创建并执行了EventLoop,就是一个IO线程。 EventLoopThread类就是一个封装好了的IO线程。

     
    EventLoopThread的工作流程为: 
    1、在主线程创建EventLoopThread对象。

     
    2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),而且主线程要等待IO线程创建完毕EventLoop对象。 
    3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完毕。 
    4、主线程返回创建的EventLoop对象。


    EventLoopThread.h

    class EventLoopThread : boost::noncopyable
    {
     public:
      typedef boost::function<void(EventLoop*)> ThreadInitCallback;
    
      EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback());
      ~EventLoopThread();
      EventLoop* startLoop();	// 启动线程,该线程就成为了IO线程
    
     private:
      void threadFunc();		// 线程函数
    
      EventLoop* loop_;			// loop_指针指向一个EventLoop对象
      bool exiting_;
      Thread thread_;
      MutexLock mutex_;
      Condition cond_;
      ThreadInitCallback callback_;		// 回调函数在EventLoop::loop事件循环之前被调用
    };
    


    EventLoopThread.cc

    EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
      : loop_(NULL),
        exiting_(false),
        thread_(boost::bind(&EventLoopThread::threadFunc, this)),
        mutex_(),
        cond_(mutex_),
        callback_(cb)
    {
    }
    
    EventLoopThread::~EventLoopThread()
    {
      exiting_ = true;
      loop_->quit();		// 退出IO线程,让IO线程的loop循环退出。从而退出了IO线程
      thread_.join(); //等待线程退出
    }
    
    EventLoop* EventLoopThread::startLoop()
    {
      assert(!thread_.started());
      thread_.start();//线程启动,调用threadFunc()
    
      {
        MutexLockGuard lock(mutex_);
        while (loop_ == NULL)
        {
          cond_.wait();//须要等待EventLoop对象的创建
        }
      }
    
      return loop_;
    }
    
    void EventLoopThread::threadFunc()
    {
      EventLoop loop;
    
      if (callback_)
      {
        callback_(&loop);
      }
    
      {
        MutexLockGuard lock(mutex_);
        // loop_指针指向了一个栈上的对象,threadFunc函数退出之后。这个指针就失效了
        // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
        // 因而不会有什么大的问题
        loop_ = &loop;
        cond_.notify(); //创建好,发送通知
      }
    
      loop.loop();// 会在这里循环,直到EventLoopThread析构。此后不再使用loop_訪问EventLoop了
      //assert(exiting_);
    }
    
    測试程序:

    #include <muduo/net/EventLoop.h>
    #include <muduo/net/EventLoopThread.h>
    
    #include <stdio.h>
    
    using namespace muduo;
    using namespace muduo::net;
    
    void runInThread()
    {
      printf("runInThread(): pid = %d, tid = %d
    ",
             getpid(), CurrentThread::tid());
    }
    
    int main()
    {
      printf("main(): pid = %d, tid = %d
    ",
             getpid(), CurrentThread::tid());
    
      EventLoopThread loopThread;
      EventLoop* loop = loopThread.startLoop();
      // 异步调用runInThread,即将runInThread加入到loop对象所在IO线程,让该IO线程运行
      loop->runInLoop(runInThread);
      sleep(1);
      // runAfter内部也调用了runInLoop。所以这里也是异步调用
      loop->runAfter(2, runInThread);
      sleep(3);
      loop->quit();
    
      printf("exit main().
    ");
    }

    对调用过程进行分析:(查看日志)

    主线程调用 loop->runInLoop(runInThread); 因为主线程(不是IO线程)调用runInLoop。 故调用queueInLoop() runInThead 加入到队列,然后wakeup() IO线程。IO线程在doPendingFunctors() 中取loop->runAfter() 要唤醒一下,此时仅仅是运行runAfter() 加入了一个2s的定时器, 2s超时。timerfd_ 可读,先handleRead()一下然后运行回调函数runInThread()

    那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是由于EventLoopThead 栈上对象析构,在析构函数内 loop_ ->quit(), 因为不是在IO线程调用quit(),故也须要唤醒一下。IO线程才干从poll 返回,这样再次循环推断 while (!quit_) 就能退出IO线程。

    EventLoopThreadPool

    muduo的线程模型:




    muduo的思想时eventLoop+thread pool。为了更方便使用,将EventLoopThread做了封装。main reactor能够创建sub reactor,并发一些任务分发到sub reactor中去。EventLoopThreadPool的思想比較简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,能够返回EventLoop对象来使用EventLoopThreadPool中的Thread。

    EventLoopThreadPool.h

    class EventLoopThreadPool : boost::noncopyable
    {
     public:
      typedef boost::function<void(EventLoop*)> ThreadInitCallback;
    
      EventLoopThreadPool(EventLoop* baseLoop);
      ~EventLoopThreadPool();
      void setThreadNum(int numThreads) { numThreads_ = numThreads; }
      void start(const ThreadInitCallback& cb = ThreadInitCallback());
      EventLoop* getNextLoop();
    
     private:
    
      EventLoop* baseLoop_;	// 与Acceptor所属EventLoop同样
      bool started_;
      int numThreads_;		// 线程数
      int next_;			// 新连接到来。所选择的EventLoop对象下标
      boost::ptr_vector<EventLoopThread> threads_;		// IO线程列表
      std::vector<EventLoop*> loops_;					// EventLoop列表
    };
    


    EventLoopThreadPool.cc

    EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
      : baseLoop_(baseLoop),
        started_(false),
        numThreads_(0),
        next_(0)
    {
    }
    
    EventLoopThreadPool::~EventLoopThreadPool()
    {
      // Don't delete loop, it's stack variable
    }
    
    void EventLoopThreadPool::start(const ThreadInitCallback& cb)
    {
      assert(!started_);
      baseLoop_->assertInLoopThread();
    
      started_ = true;
    
      for (int i = 0; i < numThreads_; ++i)
      {
        EventLoopThread* t = new EventLoopThread(cb);
        threads_.push_back(t);
        loops_.push_back(t->startLoop());	// 启动EventLoopThread线程。在进入事件循环之前。会调用cb
      }
      if (numThreads_ == 0 && cb)
      {
        // 仅仅有一个EventLoop。在这个EventLoop进入事件循环之前,调用cb
        cb(baseLoop_);
      }
    }
    
    EventLoop* EventLoopThreadPool::getNextLoop()
    {
      baseLoop_->assertInLoopThread();
      EventLoop* loop = baseLoop_;
    
      // 假设loops_为空,则loop指向baseLoop_
      // 假设不为空,依照round-robin(RR。轮叫)的调度方式选择一个EventLoop
      if (!loops_.empty())
      {
        // round-robin
        loop = loops_[next_];
        ++next_;
        if (implicit_cast<size_t>(next_) >= loops_.size())
        {
          next_ = 0;
        }
      }
      return loop;
    }
    

     

    mainReactor关注监听事件,已连接套接字事件轮询给线程池中的subReactors 处理,一个新的连接相应一个subReactor

    我们採用round-robinRR,轮叫)的调度方式选择一个EventLoop,也就是getNextLoop函数。极端情况下,线程池中个数为0时,那么新的连接交给mainReactor。这样就退化成单线程的模式。





  • 相关阅读:
    Beetle在TCP通讯中使用协议分析器和自定义协议对象
    Beetle在TCP通讯中使用AMF3协议和Flash通讯
    发布一个TCP 吞吐性能测试小工具
    替书仙澄清一些东西,并且对无知的人谈谈网络追踪
    2006年4月1日测彩研究
    构建工具研究:该死的Maven的j2me构建
    2006年4月2日测彩研究
    Eclipse的插件代码折叠
    JAVA这堆IDE........无言
    假如人生不曾相遇(转)
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/7263398.html
Copyright © 2011-2022 走看看