zoukankan      html  css  js  c++  java
  • muduo的事件处理(Reactor模型关键结构)

    muduo的Reactor模式主要有3个类实现-Channel、Poller、EventLoop、定时器

    1. 事件分发类 Channel (最终干活的)

    Channel是selectable IO channel,自始至终只负责一个 fd 的(注册与响应) IO 事件,但是不拥有该 fd ,所以也就在析构的时候不关闭它.
    Channel自始至终都属于一个EventLoop(一个EventLoop对应多个Channel,处理多个IO),自始至终只负责一个文件描述符的IO事件

      EventLoop* loop_;
      const int fd_;
      int events_;
      int revents_;
      int index_;
    
      ReadEventCallback readCallback_;
      EventCallback writeCallback_;
      EventCallback errorCallback_;
      EventCallback closeCallback_;
    
    

    几个callback函数都是c++新标准里面的function对象(muduo里面是Boost::function),它们会在handleEvent这个成员函数中根据不同的事件被调用。index_是poller类中pollfds_数组的下标。events_和revents_明显对应了struct pollfd结构中的成员。需要注意的是,Channel并不拥有该fd,它不会在析构函数中去关闭这个fd(fd是由Socket类的析构函数中关闭,即RAII的方法),Channel的生命周期由其owner负责。

    如何工作?
    在Channel类中保存这IO事件的类型以及对应的回调函数,当IO事件发生时,最终会调用到Channel类中的回调函数

    具体流程如下:
    首先给定Channel所属的 loop,及其要处理的 fd;接着注册 fd 上需要监听的事件,如果是常用的读写事件的话,可以直接调用接口函数enableReading或enableWriting来注册对应fd上的事件,disable* 是销毁指定的事件;然后通过 setCallback 来设置事件发生时的回调即可
    注册事件时函数调用关系,如下:Channel::update()->EventLoop::updateChannel(Channel)->Poller::updateChannel(Channel),最终向 poll 系统调用的监听事件表注册或修改事件。

    2. IO multiplexing 类 Poller

    Poller是个基类,具体可以是EPollPoller(默认) 或者PollPoller,对应 epoll 和 poll.需要去实现(唯一使用面向对象的一个类)

      typedef std::vector<struct pollfd> PollFdList;
      typedef std::map<int, Channel*> ChannelMap;  // fd to Channel
      PollFdList pollfds_;
      ChannelMap channels_;
    
    

    ChannelMap是fd到Channel类的映射,PollFdList保存了每一个fd所关心的事件,用作参数传递到poll函数中,Channel类里面的index_即是这里的下标。Poller类有下面四个函数

    Timestamp poll(int timeoutMs, ChannelList* activeChannels);
    void updateChannel(Channel* channel);
    void removeChannel(Channel* channel);
    private:
    void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
    
    

    updateChannel和removeChannel都是对上面两个数据结构的操作,poll函数是对::poll的封装。私有的fillActiveChannels函数负责把返回的活动时间添加到activeChannels(vector<Channel*>)这个结构中,返回给用户。Poller的职责也很简单,负责IO multiplexing,一个EventLoop有一个Poller,Poller的生命周期和EventLoop一样长。

    是eventloop的成员,它的职责仅仅是IO复用,事件分发交给 Channel 完成,生命期和 EventLoop 一样长。
    poll函数调用 epoll_wait/poll 来监听注册了的文件描述符,然后通过fillActiveChannels函数将返回的就绪事件装入 activeChannels 数组

    3. EventLoop 类

    EventLoop类是核心,大多数类都会包含一个EventLoop*的成员,因为所有的事件都会在EventLoop::loop()中通过Channel分发。先来看一下这个loop循环:

    while (!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        for (ChannelList::iterator it = activeChannels_.begin();
            it != activeChannels_.end(); ++it)
        {
          (*it)->handleEvent(pollReturnTime_);
        }
        doPendingFunctors();
    }
    
    

    handleEvent是Channel类的成员函数,它会根据事件的类型去调用不同的Callback。循环末尾还有一个doPendingFunctors(),这个函数的作用在后面多线程的部分去说明。
    EventLoop类是Reactor模式的核心,一个线程一个事件循环,即one loop per thread,EventLoop 对象的生命周期通常与其所属的线程一样长。EventLoop对象构造的时候,会检查当前线程是否已经创建了其他EventLoop对象,如果已创建,终止程序(LOG_FATAL),EventLoop类的构造函数会记录本对象所属线程(threadld_),创建了EventLoop对象的线程称为IO线程.其主要功能是运行事件循环,等待事件发生,然后调用回调处理发生的事件。EventLoop::loop() -> Poller::poll()填充就绪事件集合 activeChannels,然后遍历该容器,执行每个 channel的 Channel::handleEvent() 完成对应就绪事件回调。

    由上面三个类已经可以构成Reactor的核心,整个流程如下:
    用户通过Channel向Poller类注册fd和关心的事件
    EventLoop从poll中返回活跃的fd和对应的Channel
    通过Channel去回调相应的时间。

    4. TimerQueue

    typedef std::shared_ptr<Timer> TimerPtr;
    typedef std::pair<Timestamp, TimerPtr> Entry;
    typedef std::set<Entry> TimerList;
    Channel timerfdChannel_;
    const int timerfd_;
    TimerList timers_;
    

    采用std::pair<Timestamp, TimerPtr> 加上set的 的形式是为了处理两个Timer同事到期的情况,即使到期时间相同,它们的地址也不同。timerfdChannel_是用来管理timerfd_create函数创建的fd。Timer类里面包含了一个回调函数和一个到期时间。expiration_就是上面Entry中的Timestamp。

    const TimerCallback callback_;
    Timestamp expiration_;
    

    用一个set来保存所有的事件和时间
    根据set集合里面最早的时间来更新timerfd_的到期时间(用timerfd_settime函数)
    时间到期后,EventLoop的poll函数会返回,并调用timerfdChannel_里面的handleEvent回调函数。
    通过handleEvent这个回调函数,再去处理到期的所有事件。

    timerfdChannel_.setReadCallback(
        std::bind(&TimerQueue::handleRead,this));
    timerfdChannel_.enableReading();
    

    timerfdChannel_的callback函数注册了TimerQueue的handleRead函数。在handleRead中应该干什么就很明显了,自然是捞出所有到期的timer,一次去执行对应的事件

    void TimerQueue::handleRead()
    {
      loop_->assertInLoopThread();
      Timestamp now(Timestamp::now());
      readTimerfd(timerfd_, now);
      std::vector<Entry> expired = getExpired(now);
      // safe to callback outside critical section
      for (std::vector<Entry>::iterator it = expired.begin();
          it != expired.end(); ++it)
      {
        it->second->run();
      }
      reset(expired, now);
    }
    
    

    多线程的技巧

    一个线程一个EventLoop,每个线程都有自己管理的各种ChannelList和TimerQueue。有时候,我们总有一些需求,要在各个线程之间调配任务。比如添加一个定时时间到IO线程中,这样TimerQueue就有两个线程同时访问。

    1. runInLoop()和queueInLoop()

    先来看几个EventLoop里面重要的函数和成员:

    std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
    void EventLoop::runInLoop(Functor&& cb) {
      if (isInLoopThread()) {
        cb();
      } else {
        queueInLoop(std::move(cb));
      }
    }
    void EventLoop::queueInLoop(Functor&& cb) {
      {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
      }
      if (!isInLoopThread() || callingPendingFunctors_) {
        wakeup();
      }
    }
    
    

    注意这里的函数参数,我用到了C++11的右值引用。
    在前面的EventLoop::loop里面,我们已经看到了doPendingFunctors()这个函数,EventLoop还有一个重要的成员pendingFunctors_,该成员是暴露给其他线程的。这样,其他线程向IO线程添加定时时间的流程就是:

    其他线程调用runInLoop(),
    如果不是当前IO线程,再调用queueInLoop()
    在queueLoop中,将时间push到pendingFunctors_中,并唤醒当前IO线程
    注意这里的唤醒条件:不是当前IO线程肯定要唤醒;此外,如果正在调用Pending functor,也要唤醒;(为什么?,因为如果正在执行PendingFunctor里面,如果也执行了queueLoop,如果不唤醒的话,新加的cb就不会立即执行了。)

    2.doPendingFunctors()

    void EventLoop::doPendingFunctors() {
      std::vector<Functor> functors;
      callingPendingFunctors_ = true;
      {
        // reduce the lenth of the critical section
        // avoid the dead lock cause the functor can call queueInloop(;)
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
      }
      for (size_t i = 0; i < functors.size(); ++i) {
        functors[i]();
      }
      callingPendingFunctors_ = false;
    }
    

    doPendingFunctors并没有直接在临界区去执行functors,而是利用了一个栈对象,把事件swap到栈对象中,再去执行。这样做有两个好处:
    减少了临界区的长度,其它线程调用queueInLoop对pendingFunctors加锁时,就不会被阻塞
    避免了死锁,可能在functors里面也会调用queueInLoop(),从而造成死锁。
    回过头来看,muduo在处理多线程加锁访问共享数据的策略上,有一个很重要的原则:拼命减少临界区的长度
    试想一下,如果没有pendingFunctors_这个数据成员,我们要想往TimerQueue中添加timer,肯定要对TimerQueue里面的insert函数加锁,造成锁的争用,而pendingFunctors_这个成员将锁的范围减少到了一个vector的push_back操作上。此外,在doPendingFunctors中,利用一个栈对象减少临界区,也是很巧妙的一个重要技巧。

    3.wake()

    前面说到唤醒IO线程,EventLoop阻塞在poll函数上,怎么去唤醒它?以前的做法是利用pipe,向pipe中写一个字节,监视在这个pipe的读事件的poll函数就会立刻返回。在muduo中,采用了linux中eventfd调用

    static int createEventfd() {
      int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
      if (evtfd < 0) {
        LOG_SYSERR << "Failed in eventfd";
        abort();
      }
      return evtfd;
    }
    void EventLoop::wakeup() {
      uint64_t one = 1;
      ssize_t n = ::write(wakeupFd_, &one, sizeof one);
      if (n != sizeof one) {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
      }
    }
    
    

    把eventfd得到的fd和前面一样,通过Channel注册到poll里面,唤醒的时候,只需要向wakeupFd中写入一个字节,就能达到唤醒的目的。eventfd、timerfd都体现了linux的设计哲学,Everyting is a fd。

    参考
    [2]基本网络编程范式 https://segmentfault.com/a/1190000005910673

  • 相关阅读:
    tcpdump抓localhost 127.0.0.1的包
    fatal: write failure on 'stdout': Bad file descriptor
    rst转html
    Call to WHvSetupPartition failed: ERROR_SUCCESS (Last=0xc000000d/87) (VERR_NEM_VM_CREATE_FAILED)
    error: 'readdir_r' is deprecated [-Werror=deprecated-declarations]
    ubuntu虚拟机设置共享后无权限
    VirtualBox怎么设置从u盘启动,虚拟机从U盘启动
    File类总结
    相对路径的写法
    systeminfo总结
  • 原文地址:https://www.cnblogs.com/demian/p/13215663.html
Copyright © 2011-2022 走看看