zoukankan      html  css  js  c++  java
  • Event Loop

    Reactor模式:

    Reactor包含如下角色:

      • Handle 句柄;用来标识socket连接或是打开文件;
      • Synchronous Event Demultiplexer:同步事件多路分解器:由操作系统内核实现的一个函数;用于阻塞等待发生在句柄集合上的一个或多个事件;(如select/epoll;)
      • Event Handler:事件处理接口
      • Concrete Event HandlerA:实现应用程序所提供的特定事件处理逻辑;
      • Reactor:反应器,定义一个接口,实现以下功能:
        1)供应用程序注册和删除关注的事件句柄;
        2)运行事件循环;
        3)有就绪事件到来时,分发事件到之前注册的回调函数上处理;

    业务流程及时序图:

    1. 应用启动,将关注的事件handle注册到Reactor中;
    2. 调用Reactor,进入无限事件循环,等待注册的事件到来;
    3. 事件到来,select返回,Reactor将事件分发到之前注册的回调函数中处理;

    转自:https://segmentfault.com/a/1190000002715832

    channel类:

      ///channel类的作用是把不同的IO事件分发给不同的回调函数,(一个fd上可以支持监听多个IO事件类型)
      ///每个channel对象只属于一个EventLoop
      ///每个channel对象只负责一个文件描述符fd的IO事件分发
      ///channel就像是一个容器,管理fd到callback的映射,当活动事件来临时,分发事件给不同的callback
      ///channel对象负责的fd
      const int  fd_;
      //channel关心的IO事件,有三种事件类型:kNoneEvent kReadEvent kWriteEvent,定义在channel.cc中,由enableReading disableReading enableWriting disableWriting等函数来设置
      int        events_;
      //目前活动的事件,由poller设置
      int        revents_; // it's the received event types of epoll or poll
    //从shared_ptr对象obj构造weak_ptr tie_,获得资源的观测权
    //防止shared_ptr出现循环引用
    void Channel::tie(const boost::shared_ptr<void>& obj)
    {
      tie_ = obj;
      tied_ = true;
    }
    //handleEvent是channel的核心,由EventLoop::loop()通过channel对象调用,然后该函数根据revents_的值分别调用不同的用户回调
    void Channel::handleEvent(Timestamp receiveTime)
    {
      boost::shared_ptr<void> guard;
      if (tied_)
      {
        //lock从被观测的shared_ptr获得一个可用的shared_ptr对象,从而操作资源
        guard = tie_.lock();
        if (guard)
        {
          handleEventWithGuard(receiveTime);
        }
      }
      else
      {
        handleEventWithGuard(receiveTime);
      }
    }
    //根据revents_的值(目前活动的事件)分别调用不同的用户回调,也就是说channel对象处理fd上各种类型的事件,与events_无关(?)
    void Channel::handleEventWithGuard(Timestamp receiveTime)
    {
      eventHandling_ = true;
      LOG_TRACE << reventsToString();
      if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
      {
        if (logHup_)
        {
          LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
        }
        if (closeCallback_) closeCallback_();
      }
    
      if (revents_ & POLLNVAL)
      {
        LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
      }
    
      if (revents_ & (POLLERR | POLLNVAL))
      {
        if (errorCallback_) errorCallback_();
      }
      if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
      {
        if (readCallback_) readCallback_(receiveTime);
      }
      if (revents_ & POLLOUT)
      {
        if (writeCallback_) writeCallback_();
      }
      eventHandling_ = false;
    }

    poller类:

      ///fd到channel*的映射
      typedef std::map<int, Channel*> ChannelMap;
      ///每次fd被创建时,就应该加入到channels_中
      ChannelMap channels_;

    poller类调用poll获得当前有活动IO事件的fd,将它对应的channel填入activeChannels.

    EventLoop类:

    //事件循环,该函数不能跨线程调用,只能在创建该对象的线程中调用
    void EventLoop::loop()
    {
      assert(!looping_);
      assertInLoopThread();
      looping_ = true;
      quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
      LOG_TRACE << "EventLoop " << this << " start looping";
    
      while (!quit_)
      {
        activeChannels_.clear();
        ///调用poll获得当前活动事件的channel列表(其实是将有活动事件的fd对应的channel填入activechannels_),然后依次调用每个channel的handleEvent函数
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ++iteration_;
        if (Logger::logLevel() <= Logger::TRACE)
        {
          printActiveChannels();
        }
        // TODO sort channel by priority
        eventHandling_ = true;
        for (ChannelList::iterator it = activeChannels_.begin();
            it != activeChannels_.end(); ++it)
        {
          currentActiveChannel_ = *it;
          currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;
    //执行等待队列中的回调函数 doPendingFunctors(); } LOG_TRACE
    << "EventLoop " << this << " stop looping"; looping_ = false; }
    //检查当前的线程是否是所在EventLoop对象所属的线程,因为有的函数是线程安全的,有的函数不是
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } //保证线程安全地调用函数cb
    void EventLoop::runInLoop(const Functor& cb) { ///当前线程与cb所属IO线程相同,回调会同步进行 if (isInLoopThread()) { cb(); } else { ///cb被加入队列,IO线程会被唤醒来调用这个函数
      
    queueInLoop(cb); } } void EventLoop::queueInLoop(const Functor& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } //调用queueInLoop的线程不是IO线程或者IO线程正在调用doPendingFunctors,此时需要唤醒IO线程(指的是当前loop循环结束后,不要阻塞在poll循环处,而是立即从poll函数调用返回)
    //调用queueInLoop的线程不是当前IO线程则需要唤醒当前线程,才能及时执行doPendingFunctors
    //doPendingFunctors调用的Functor可能又添加了任务,故循环回去poll的时候需要被唤醒,进而继续执行doPendingFunctors(),否则这些新加的cb就不能被及时调用.若不唤醒,只有等监听的fd有活动事件的时候,才会继续执行
    //只有在handleEvent()中调用queueInLoop不需要唤醒,因为接下来马上就会执行doPendingFunctors()
    if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }
    //该函数只会被当前IO线程调用,依次执行队列中的函数
    void EventLoop::doPendingFunctors()
    {
      std::vector<Functor> functors;
      callingPendingFunctors_ = true;
    
      {
      MutexLockGuard lock(mutex_);
      functors.swap(pendingFunctors_);
      }
    
      for (size_t i = 0; i < functors.size(); ++i)
      {
        functors[i]();
      }
      callingPendingFunctors_ = false;
    }

    IO线程平时阻塞在事件循环EventLoop::loop()的poll调用中,为了让IO线程能立刻执行用户回调,需要唤醒它。传统的方法是使用管道pipe,IO线程始终监视此管道的readable事件,在需要唤醒的时候,其他线程往管道里写一个字节,这样IO线程就从IO 多路复用阻塞调用中返回。linux中使用eventfd,可以更高效地唤醒,因为它不必管理缓冲区。

    eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。

    http://blog.csdn.net/solstice/article/details/6171831

    //wakeupFd_是eventfd,
      int wakeupFd_;
      //wakeupChannel_处理wakeupFd上的readable事件,将事件分发至handleRead函数
      boost::scoped_ptr<Channel> wakeupChannel_;

     http://blog.csdn.net/NK_test/article/details/51138359

    http://blog.csdn.net/yusiguyuan/article/details/40593721?utm_source=tuicool&utm_medium=referral

    总结一下就是:
    假设我们有这样的调用:loop->runInLoop(run),说明想让IO线程执行一定的计算任务,此时若是在当前的IO线程,就马上执行run();如果是其他线程调用的,那么就执行queueInLoop(run),将run异步添加到队列,当loop内处理完事件后,就执行doPendingFunctors(),也就执行到了run();最后想要结束线程的话,执行quit。

    http://dirtysalt.info/muduo.html

    EventLoopThread类:

    IO线程不一定是主线程,我们可以在任何一个线程创建并运行EventLoop,一个程序也可以有不止一个IO线程.EventLoopThread会启动自己的线程,并在其中运行EventLoop:loop(),用条件变量来等待线程的创建和运行.

    EventLoop* EventLoopThread::startLoop()
    {
      assert(!thread_.started());
      thread_.start();
    
      {
        MutexLockGuard lock(mutex_);
        while (loop_ == NULL)
        {
          cond_.wait();
        }
      }
    
      return loop_;
    }
    //线程主函数
    void EventLoopThread::threadFunc()
    {
      EventLoop loop;
    
      if (callback_)
      {
        callback_(&loop);
      }
    
      {
        MutexLockGuard lock(mutex_);
        loop_ = &loop;
        //唤醒startLoop()
        cond_.notify();
      }
    
      loop.loop();
      //assert(exiting_);
      loop_ = NULL;
    }

    条件变量:http://blog.jobbole.com/44409/

    http://blog.csdn.net/liuxuejiang158blog/article/details/16554023  这个例子要看一下

  • 相关阅读:
    iptables 详解
    Linux Crontab 定时任务命令详解
    Linux下查看历史操作记录
    Linux shell if 参数
    Linux的五个查找命令:find,locate,whereis,which,type
    linux下IPTABLES配置详解
    Linux命令之while Bash中的While循环
    日志分割脚本
    详解 Too many open files
    微软停止对WindowsNT4.0系统提供无偿的支持
  • 原文地址:https://www.cnblogs.com/ljygoodgoodstudydaydayup/p/5647173.html
Copyright © 2011-2022 走看看