zoukankan      html  css  js  c++  java
  • muduo网络库代码剖析——EventLoop类

    EventLoop在网络库中的作用?

    主线程也有一个线程在跑EventLoop对象的loop()函数,在这个函数内关注服务端的socketfd用来接收新的客户端socketfd连接。

    将这个新的socketfd连接放到各个线程中并运行线程的EventLoop对象的loop()来关注已连接socketfd的可读可写事件。

    所以EventLoop在网络库中的作用基本就是来关注fd事件执行回调函数的。

    EventLoop对象的数据成员:

    typedef std::vector<Channel*> ChannelList;
    
    //线程是否有进入EventLoop的loop()函数
    bool looping_; /* atomic */
    
    //线程是否要退出EventLoop的loop()函数
    std::atomic<bool> quit_;
    
    //线程是否在执行激活状态channel通道的回调函数        
    bool eventHandling_; /* atomic */
    
    //EventLoop线程做的额外工作 
    bool callingPendingFunctors_; /* atomic */
    
    //loop函数循环的次数
    int64_t iteration_;
    
    //独一无二的线程标识,能区分不同进程间的不同线程
    const pid_t threadId_;
    
    //poll有事件到来返回的时间  
    Timestamp pollReturnTime_;
    
    //poll函数的具体调用
    std::unique_ptr<Poller> poller_;
    
    //定时器功能的实现
    std::unique_ptr<TimerQueue> timerQueue_;
    
    //用于唤醒阻塞在poll函数的线程自己
    int wakeupFd_;
    
    //这个channel对象是用wakeupFd_和唤醒后需要执行什么函数构造的
    std::unique_ptr<Channel> wakeupChannel_;
    
    //??还未知作用
    boost::any context_;
    
    // 返回所有有事件到来的channel通道
    ChannelList activeChannels_;
    
    //当前正在执行哪个channel回调函数的channel指针
    Channel* currentActiveChannel_;
    
    //互斥锁用来互斥地往pendingFunctors_添加额外任务
    mutable MutexLock mutex_;
    
    //EventLoop线程除了执行有事件到来的Channel的回调函数外,也会执行这个vector内的函数(我叫它为额外的工作)
    std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);

    EventLoop有什么功能?

    1.返回所有文件描述符fd有事件(可读可写错误)的通道channel,并执行channel通道注册的回调函数。这是EventLoop对象所在线程的主要工作。

    2.EventLoop对象所处的线程会阻塞在poll函数,利用eventfd唤醒自己来做一些额外的工作(例如:添加/删除一个新定时器任务)

    3.利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。

    一、EventLoop的主要工作是调用激活的Channel的回调函数

    流程图:

    可以看到Channel对象只是调用了handleEvent(),最终调用的函数是更上一层类或者说是Channel对象所属类注册给channel的回调函数。

    二、EventLoop如何利用eventfd从poll函数中唤醒自己去做额外工作

    在EventLoop的构造函数内就会设置wakeupChannel_可读事件激活时要回调的函数EventLoop::handleRead(),并将监听eventfd文件描述符事件。当可读事件到来的时候会调用

    void EventLoop::handleRead()
    {
      uint64_t one = 1;
      ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
      }
    }

    从eventupFd_文件描述符中取走数据,防止不断触发可读事件。

    设置到wakeupChannel_可读事件是调用了:

    void EventLoop::wakeup()
    {
      uint64_t one = 1;
      ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
      }
    }

    流程图:

    Channel类对象wakeupChannel_是如何被添加进监听列表的见:

     muduo网络库代码剖析——Channel类

     

    muduo网络库代码剖析——Poller类、PollPoller类、EpollPoller类

    三、利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。

     既然EventLoop对象的主要任务是执行有事件的Channel的回调函数,那么就可以利用timerfd实现定时调用回调函数的功能;

    muduo网络库代码剖析——TimerId类、Timer类、TimerQueue类

    EventLoop.cc文件:

     1 #include "muduo/net/EventLoop.h"
     2 
     3 #include "muduo/base/Logging.h"
     4 #include "muduo/base/Mutex.h"
     5 #include "muduo/net/Channel.h"
     6 #include "muduo/net/Poller.h"
     7 #include "muduo/net/SocketsOps.h"
     8 #include "muduo/net/TimerQueue.h"
     9 
    10 #include <algorithm>
    11 
    12 #include <signal.h>
    13 #include <sys/eventfd.h>
    14 #include <unistd.h>
    15 
    16 using namespace muduo;
    17 using namespace muduo::net;
    18 
    19 namespace
    20 {
    21 __thread EventLoop* t_loopInThisThread = 0;
    22 
    23 const int kPollTimeMs = 10000;
    24 
    25 int createEventfd()
    26 {
    27   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    28   if (evtfd < 0)
    29   {
    30     LOG_SYSERR << "Failed in eventfd";
    31     abort();
    32   }
    33   return evtfd;
    34 }
    35 
    36 #pragma GCC diagnostic ignored "-Wold-style-cast"
    37 class IgnoreSigPipe
    38 {
    39  public:
    40   IgnoreSigPipe()
    41   {
    42     ::signal(SIGPIPE, SIG_IGN);
    43     // LOG_TRACE << "Ignore SIGPIPE";
    44   }
    45 };
    46 #pragma GCC diagnostic error "-Wold-style-cast"
    47 
    48 IgnoreSigPipe initObj;
    49 }  // namespace

    知识点1:匿名命名空间

    知识点2:static全局变量

    全局对象会在main函数之前被先调用,所以一开始IgnoreSigPipe()构造函数就被调用了。

     1 EventLoop::EventLoop()
     2   : looping_(false),
     3     quit_(false),
     4     eventHandling_(false),
     5     callingPendingFunctors_(false),
     6     iteration_(0),
     7     threadId_(CurrentThread::tid()),
     8     poller_(Poller::newDefaultPoller(this)),
     9     timerQueue_(new TimerQueue(this)),
    10     wakeupFd_(createEventfd()),
    11     wakeupChannel_(new Channel(this, wakeupFd_)),
    12     currentActiveChannel_(NULL)
    13 {
    14   LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
    15   if (t_loopInThisThread)
    16   {
    17     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
    18               << " exists in this thread " << threadId_;
    19   }
    20   else
    21   {
    22     t_loopInThisThread = this;
    23   }
    24   wakeupChannel_->setReadCallback(
    25       std::bind(&EventLoop::handleRead, this));
    26   // we are always reading the wakeupfd
    27   wakeupChannel_->enableReading();
    28 }
    29 
    30 EventLoop::~EventLoop()
    31 {
    32   LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
    33             << " destructs in thread " << CurrentThread::tid();
    34   wakeupChannel_->disableAll();
    35   wakeupChannel_->remove();
    36   ::close(wakeupFd_);
    37   t_loopInThisThread = NULL;
    38 }

    构造函数:

    当eventfd有可读事件到来以后,EventLoop对象注册了它需要调用的函数:EventLoop::handleRead()

    第8行:

    Poller* Poller::newDefaultPoller(EventLoop* loop)
    {
      if (::getenv("MUDUO_USE_POLL"))
      {
        return new PollPoller(loop);
      }
      else
      {
        return new EPollPoller(loop);
      }
    }
    依据环境变量选择poll还是epoll系统调用

    析构函数:

    释放每个EventLoop对象持有的eventfd文件描述符。

     1 void EventLoop::loop()
     2 {
     3   assert(!looping_);
     4   assertInLoopThread();
     5   looping_ = true;
     6   quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
     7   LOG_TRACE << "EventLoop " << this << " start looping";
     8 
     9   while (!quit_)
    10   {
    11     activeChannels_.clear();
    12     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    13     ++iteration_;
    14     if (Logger::logLevel() <= Logger::TRACE)
    15     {
    16       printActiveChannels();
    17     }
    18     // TODO sort channel by priority
    19     eventHandling_ = true;
    20     for (Channel* channel : activeChannels_)
    21     {
    22       currentActiveChannel_ = channel;
    23       currentActiveChannel_->handleEvent(pollReturnTime_);
    24     }
    25     currentActiveChannel_ = NULL;
    26     eventHandling_ = false;
    27     doPendingFunctors();
    28   }
    29 
    30   LOG_TRACE << "EventLoop " << this << " stop looping";
    31   looping_ = false;
    32 }

    每一个线程有且只有一个EventLoop对象,且在循环跑EventLoop对象的loop()函数。这个函数可谓是顶一片天的函数。

    第4行:

      void assertInLoopThread()
      {
        if (!isInLoopThread())
        {
          abortNotInLoopThread();
        }
      }
    
    void EventLoop::abortNotInLoopThread()
    {
          LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " <<  CurrentThread::tid();
    }
    bool isInLoopThread() const 
    { 
        return threadId_ == CurrentThread::tid(); 
    }
    assertInLoopThread的实现

     判断当前调用函数的线程是否是该EventLoop所在的线程,此函数不能跨线程调用执行。

    第12~26行:

    (假设已经注册了监听fd的Channel通道)返回所有有事件(包括可读可写)的Channel。并调用这些Channel注册的回调函数。

     第27行:

    void EventLoop::doPendingFunctors()
    {
      std::vector<Functor> functors;
      callingPendingFunctors_ = true;
    
      {
      MutexLockGuard lock(mutex_);
      functors.swap(pendingFunctors_);
      }
    
      for (const Functor& functor : functors)
      {
        functor();
      }
      callingPendingFunctors_ = false;
    }
    EventLoop的额外工作

    EventLoop除了会执行Channel注册的回调函数外,还可能会执行其他的一些事件。这些事件都会放在vector<Functor> pendingFunctors_内;

    这些事件包括:

    1.新Timer加入到TimerQueue内。

    2.从TimerQueue删除定时器任务。

    其中为什么要在doPendingFunctors内加锁,且是用局部作用的方式加锁??

    假设有两个线程A和B:

    B线程有可能会继续在调用线程A的EventLoop::runInLoop()让往pendingFunctors_中添加任务,由于线程B往线程A的pendingFunctors加Functor, 线程A在调用pendingFunctors中的Functor,由于两个线程在操作同一个变量,那么在线程A调用pendingFunctors_的Functor时就需要加锁。

    这里的加锁还很巧妙。直接将所有的Functor交换出来,一方面是减少了临界区的长度。一方面也避免了死锁:如果Functor又调用自身的EventLoop::runInLoop(),如果把调用Functor也加锁的话,就会造成死锁。

    让线程退出,即让EventLoop对象所在的线程退出loop()函数:

     1 void EventLoop::quit()
     2 {
     3   quit_ = true;
     4   // There is a chance that loop() just executes while(!quit_) and exits,
     5   // then EventLoop destructs, then we are accessing an invalid object.
     6   // Can be fixed using mutex_ in both places.
     7   if (!isInLoopThread())
     8   {
     9     wakeup();
    10   }
    11 }
    12 
    13 void EventLoop::wakeup()
    14 {
    15   uint64_t one = 1;
    16   ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
    17   if (n != sizeof one)
    18   {
    19     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    20   }
    21 }

    为什么如果当前线程不是EventLoop对象所在的线程,要利用eventFd唤醒线程呢?

    因为有可能想让那个线程退出但是线程阻塞在了poll/epoll上,因此唤醒它来退出。

    那为什么当前线程是EventLoop对象所在的线程就不需要唤醒呢?

    因为如果当前线程能执行到quit()函数,所有当前EventLoop对象所在的线程并没有阻塞,等处理完一个事件一个会回到判断while(!quit_)然后正常退出的。

    四、EventLoop线程做的额外事情(除了处理fd可读可写事件外)

     1 void EventLoop::runInLoop(Functor cb)
     2 {
     3   if (isInLoopThread())
     4   {
     5     cb();
     6   }
     7   else
     8   {
     9     queueInLoop(std::move(cb));
    10   }
    11 }
    12 
    13 void EventLoop::queueInLoop(Functor cb)
    14 {
    15   {
    16   MutexLockGuard lock(mutex_);
    17   pendingFunctors_.push_back(std::move(cb));
    18   }
    19 
    20   if (!isInLoopThread() || callingPendingFunctors_)
    21   {
    22     wakeup();
    23   }
    24 }

    runInLoop()的字面意思是:在EventLoop对象所在的线程调用。

    此时又分两种情况:

    1.调用runInLoop()的线程是EventLoop对象所在的线程,那么这个线程直接调用这个事件就可以了(Functor cb)。

    2.如果调用runInLoop()的线程不是EventLoop对象所在的线程,那么加锁放入vector<Functor> pendingFunctors_内,然后唤醒EventLoop对象所在的线程来处理。

    不过有可能EventLoop对象所处的线程正在执行eventHandling_,那么会发生什么呢?在处理完eventHandling后,会doPendingFunctors,这个时候额外事件是已经添加进来了。那么唤醒poll的eventfd又会额外调用一次doPendingFunctors,会不会有问题呢??

    五、EventLoop对定时事件的实现

    TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
    {
      return timerQueue_->addTimer(std::move(cb), time, 0.0);
    }
    
    TimerId EventLoop::runAfter(double delay, TimerCallback cb)
    {
      Timestamp time(addTime(Timestamp::now(), delay));
      return runAt(time, std::move(cb));
    }
    
    TimerId EventLoop::runEvery(double interval, TimerCallback cb)
    {
      Timestamp time(addTime(Timestamp::now(), interval));
      return timerQueue_->addTimer(std::move(cb), time, interval);
    }
    
    void EventLoop::cancel(TimerId timerId)
    {
      return timerQueue_->cancel(timerId);
    }

    六、如何让EventLoop关注(监听)新的文件描述符的事件

     1 void EventLoop::updateChannel(Channel* channel)
     2 {
     3   assert(channel->ownerLoop() == this);
     4   assertInLoopThread();
     5   poller_->updateChannel(channel);
     6 }
     7 
     8 void EventLoop::removeChannel(Channel* channel)
     9 {
    10   assert(channel->ownerLoop() == this);
    11   assertInLoopThread();
    12   if (eventHandling_)
    13   {
    14     assert(currentActiveChannel_ == channel ||
    15         std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
    16   }
    17   poller_->removeChannel(channel);
    18 }
    19 
    20 bool EventLoop::hasChannel(Channel* channel)
    21 {
    22   assert(channel->ownerLoop() == this);
    23   assertInLoopThread();
    24   return poller_->hasChannel(channel);
    25 }

    如何实现每个线程只能由一份EventLoop对象?

    EventLoop.cc文件内的全局变量用于记录EventLoop对象指针(_thread修饰即每一个线程都有一份独立的该变量,否则该变量在线程间是共享的):

    __thread EventLoop* t_loopInThisThread = 0;

    利用一个静态成员函数用于断言判断线程是否已经有一个EventLoop对象:

    static EventLoop* getEventLoopOfCurrentThread();
      assert(EventLoop::getEventLoopOfCurrentThread() == NULL);
      EventLoop loop;
      assert(EventLoop::getEventLoopOfCurrentThread() == &loop);

    在EventLoop对象的构造函数内调用:

      if (t_loopInThisThread)
      {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                  << " exists in this thread " << threadId_;
      }
      else
      {
        t_loopInThisThread = this;
      }

    如果是同一个线程内创建了EventLoop对象两次,在第二次的构造函数中因为全局变量t_loopInThisThread已被赋值,故会强制退出第二次构造EventLoop对象失败。

    EventLoop的构造函数会记住本对象所属的线程id(threadId是线程唯一的标识,类型是pid_t)。

    EventLoop主要调用loop()事件循环函数,该函数不能跨线程调用。只能在创建EventLoop对象的线程中调用loop()函数。

  • 相关阅读:
    网线帘幕动画
    图片缩放/旋转/平移/设置分辨率
    贝塞尔样条
    线性梯度画刷
    画七彩五角星
    kafka安装
    在windows远程提交任务给Hadoop集群(Hadoop 2.6)
    把Spark SQL的metadata存储到mysql
    使用IDEA开发SPARK提交remote cluster执行
    Netty的Channel
  • 原文地址:https://www.cnblogs.com/jialin0x7c9/p/12246021.html
Copyright © 2011-2022 走看看