zoukankan      html  css  js  c++  java
  • MIT 2012分布式课程基础源码解析-事件管理封装

    这部分的内容主要包括Epoll/select的封装,在封装好相应函数后,再使用一个类来管理相应事件,实现的文件为pollmgr.{h, cc}。

    事件函数封装

    可看到pollmgr.h文件下定一个了一个虚基类aio_mgr

    1 class aio_mgr {
    2     public:
    3         virtual void watch_fd(int fd, poll_flag flag) = 0;
    4         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
    5         virtual bool is_watched(int fd, poll_flag flag) = 0;
    6         virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
    7         virtual ~aio_mgr() {}
    8 };
    View Code

    这便是具体事件类实现的基类,可看到文件末尾处的继承关系

     1 class SelectAIO : public aio_mgr {
     2     public :
     3 
     4         SelectAIO();
     5         ~SelectAIO();
     6         void watch_fd(int fd, poll_flag flag);
     7         bool unwatch_fd(int fd, poll_flag flag);
     8         bool is_watched(int fd, poll_flag flag);
     9         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
    10 
    11     private:
    12 
    13         fd_set rfds_;
    14         fd_set wfds_;
    15         int highfds_;
    16         int pipefd_[2];
    17 
    18         pthread_mutex_t m_;
    19 
    20 };
    21 
    22 #ifdef __linux__ 
    23 class EPollAIO : public aio_mgr {
    24     public:
    25         EPollAIO();
    26         ~EPollAIO();
    27         void watch_fd(int fd, poll_flag flag);
    28         bool unwatch_fd(int fd, poll_flag flag);
    29         bool is_watched(int fd, poll_flag flag);
    30         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
    31 
    32     private:
    33         int pollfd_;
    34         struct epoll_event ready_[MAX_POLL_FDS];
    35         int fdstatus_[MAX_POLL_FDS];
    36 
    37 };
    38 #endif /* __linux */
    View Code

    相应是使用select和epoll分别实现的事件管理类,其中最主要的方法是wait_ready,这个方法实现了具体的事件查询,其余几个函数用于管理套接字,如增加套接字,删除套接字以及判断套接字是否还存活着。这里我们主要看下epoll实现部分,select实现部分类似。epoll的详解可看这里

      1 EPollAIO::EPollAIO()
      2 {
      3     pollfd_ = epoll_create(MAX_POLL_FDS);
      4     VERIFY(pollfd_ >= 0);
      5     bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
      6 }
      7 
      8 EPollAIO::~EPollAIO()
      9 {
     10     close(pollfd_);
     11 }
     12 
     13 //状态转换
     14 static inline
     15 int poll_flag_to_event(poll_flag flag)
     16 {
     17     int f;
     18     if (flag == CB_RDONLY) {
     19         f = EPOLLIN;
     20     }else if (flag == CB_WRONLY) {
     21         f = EPOLLOUT;
     22     }else { //flag == CB_RDWR
     23         f = EPOLLIN | EPOLLOUT;
     24     }
     25     return f;
     26 }
     27 /*
     28  *   这个函数就相当于:准备下一个监听事件的类型
     29  */
     30 void
     31 EPollAIO::watch_fd(int fd, poll_flag flag)
     32 {
     33     VERIFY(fd < MAX_POLL_FDS);
     34 
     35     struct epoll_event ev;
     36     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
     37     fdstatus_[fd] |= (int)flag;
     38 
     39     //边缘触发模式
     40     ev.events = EPOLLET;
     41     ev.data.fd = fd;
     42     //注册读事件
     43     if (fdstatus_[fd] & CB_RDONLY) {
     44         ev.events |= EPOLLIN;
     45     }//注册写事件
     46     if (fdstatus_[fd] & CB_WRONLY) {
     47         ev.events |= EPOLLOUT;
     48     }
     49 
     50     if (flag == CB_RDWR) {
     51         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
     52     }
     53     //更改
     54     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
     55 }
     56 
     57 bool 
     58 EPollAIO::unwatch_fd(int fd, poll_flag flag)
     59 {
     60     VERIFY(fd < MAX_POLL_FDS);
     61     fdstatus_[fd] &= ~(int)flag;
     62 
     63     struct epoll_event ev;
     64     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
     65 
     66     ev.events = EPOLLET;
     67     ev.data.fd = fd;
     68 
     69     if (fdstatus_[fd] & CB_RDONLY) {
     70         ev.events |= EPOLLIN;
     71     }
     72     if (fdstatus_[fd] & CB_WRONLY) {
     73         ev.events |= EPOLLOUT;
     74     }
     75 
     76     if (flag == CB_RDWR) {
     77         VERIFY(op == EPOLL_CTL_DEL);
     78     }
     79     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
     80     return (op == EPOLL_CTL_DEL);
     81 }
     82 
     83 bool
     84 EPollAIO::is_watched(int fd, poll_flag flag)
     85 {
     86     VERIFY(fd < MAX_POLL_FDS);
     87     return ((fdstatus_[fd] & CB_MASK) == flag);
     88 }
     89 /**
     90  *  事件循环,查看有哪些事件已经准备好,准备好的事件则插入相应列表中
     91  */
     92 void
     93 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
     94 {
     95     //得到已准备好的事件数目
     96     int nfds = epoll_wait(pollfd_, ready_,    MAX_POLL_FDS, -1);
     97     //遍历套接字数组,将可读/可写套接字添加到readable/writable数组中,便于后面处理
     98     for (int i = 0; i < nfds; i++) {
     99         if (ready_[i].events & EPOLLIN) {
    100             readable->push_back(ready_[i].data.fd);
    101         }
    102         if (ready_[i].events & EPOLLOUT) {
    103             writable->push_back(ready_[i].data.fd);
    104         }
    105     }
    106 }
    View Code

    事件管理

    在pollmgr.h中还有个重要的类

    class aio_callback {
        public:
            virtual void read_cb(int fd) = 0;
            virtual void write_cb(int fd) = 0;
            virtual ~aio_callback() {}
    };
    View Code

    这是一个回调虚基类,里面两个函数可从函数名猜到功能,即从对应的套接字读取/写入数据。该基类在后面底层通信中扮演着重要的角色。

    然后我们再看后面的PollMgr类,这便是事件管理类,同时它还使用了单例模式。

     1 class PollMgr {
     2     public:
     3         PollMgr();
     4         ~PollMgr();
     5 
     6         static PollMgr *Instance();
     7         static PollMgr *CreateInst();
     8         //在对应的套接字上添加事件
     9         void add_callback(int fd, poll_flag flag, aio_callback *ch);
    10         //删除套接字上的所有事件
    11         void del_callback(int fd, poll_flag flag);
    12         bool has_callback(int fd, poll_flag flag, aio_callback *ch);
    13         //阻塞删除套接字,为何阻塞呢?因为删除时,其它线程正在使用该套接字
    14         void block_remove_fd(int fd);
    15         //主要事件循环方法
    16         void wait_loop();
    17 
    18         static PollMgr *instance;
    19         static int useful;
    20         static int useless;
    21 
    22     private:
    23         pthread_mutex_t m_;
    24         pthread_cond_t changedone_c_;
    25         pthread_t th_;
    26 
    27         aio_callback *callbacks_[MAX_POLL_FDS]; //事件数组,即数组下标为相应的套接字
    28         aio_mgr *aio_;   //具体的事件函数类,可实现为epoll/select
    29         bool pending_change_;
    30 };
    View Code

    其中最主要的函数是wait_loop

    接下来我们看具体实现。

     1 PollMgr *PollMgr::instance = NULL;
     2 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
     3 
     4 void
     5 PollMgrInit()
     6 {
     7     PollMgr::instance = new PollMgr();
     8 }
     9 
    10 PollMgr *
    11 PollMgr::Instance()
    12 {
    13     //保证PollMgrInit在本线程内只初始化一次
    14     pthread_once(&pollmgr_is_initialized, PollMgrInit);
    15     return instance;
    16 }
    View Code

    这里实现单例,pthread_once保证了线程中只初始化一次PollMgrInit()函数,所以在具体使用时,只需调用PollMgr::Instance()即可获得该管理类,再在其上处理各种各种事件。这里有个小疑问是:instance变量不应该是私有变量吗?

    接下来我们看构造析构函数:

    PollMgr::PollMgr() : pending_change_(false)
    {
        bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
        //aio_ = new SelectAIO();
        aio_ = new EPollAIO();
        VERIFY(pthread_mutex_init(&m_, NULL) == 0);
        VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
        //this表示本类,wait_loop是本类中的一个方法,false表示不分离(detach)
        VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
    }
    
    PollMgr::~PollMgr()
    {
        //never kill me!!!
        VERIFY(0);
    }
    View Code

    构造函数中初始化了事件类,使用了EpollAIO类,初始化了互斥量和条件变量,然后创建了一个线程调用wait_loop。有意思的是析构函数(never kill me)

     接下来是几个管理函数,管理套接字和回调的函数 

     1 void
     2 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
     3 {
     4     VERIFY(fd < MAX_POLL_FDS);
     5 
     6     ScopedLock ml(&m_);
     7     aio_->watch_fd(fd, flag);
     8 
     9     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
    10     callbacks_[fd] = ch;
    11 }
    12 
    13 //remove all callbacks related to fd
    14 //the return guarantees that callbacks related to fd
    15 //will never be called again
    16 void
    17 PollMgr::block_remove_fd(int fd)
    18 {
    19     ScopedLock ml(&m_);
    20     aio_->unwatch_fd(fd, CB_RDWR);
    21     pending_change_ = true;
    22     VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
    23     callbacks_[fd] = NULL;
    24 }
    25 
    26 //删除相应的回调函数
    27 void
    28 PollMgr::del_callback(int fd, poll_flag flag)
    29 {
    30     ScopedLock ml(&m_);
    31     if (aio_->unwatch_fd(fd, flag)) {
    32         callbacks_[fd] = NULL;
    33     }
    34 }
    35 
    36 //
    37 bool
    38 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
    39 {
    40     ScopedLock ml(&m_);
    41     if (!callbacks_[fd] || callbacks_[fd]!=c)
    42         return false;
    43 
    44     return aio_->is_watched(fd, flag);
    45 }
    View Code

    下面便是循环的主方法,该方法一直循环获取相应的事件,但此方法有个问题是,当某个回调读取需要长时间阻塞时,

    会耽误后续事件的读取或写入。

    //循环的主方法
    void
    PollMgr::wait_loop()
    {
    
        std::vector<int> readable;  //可读套接字的vector
        std::vector<int> writable;  //可写套接字的vector
        //
        while (1) {
            {
                ScopedLock ml(&m_);
                if (pending_change_) {
                    pending_change_ = false;
                    VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
                }
            }
            //首先清空两个vector
            readable.clear();
            writable.clear();
            //这里便监听了事件,读或写事件,有时间发生便将事件的fd插入相应的vector
            aio_->wait_ready(&readable,&writable);
            //如果这次没有可读和可写事件,则继续下一次循环
            if (!readable.size() && !writable.size()) {
                continue;
            } 
            //no locking of m_
            //because no add_callback() and del_callback should 
            //modify callbacks_[fd] while the fd is not dead
            for (unsigned int i = 0; i < readable.size(); i++) {
                int fd = readable[i];
                if (callbacks_[fd]) //相应的回调函数读取套接字上的数据
                    callbacks_[fd]->read_cb(fd);
            }
    
            for (unsigned int i = 0; i < writable.size(); i++) {
                int fd = writable[i];
                if (callbacks_[fd])
                    callbacks_[fd]->write_cb(fd);
            }
        }
    }
    View Code

    具体使用时,只需获得单例类即可,然后再添加相应的套接字及回调函数,添加都是线程安全的,因为在相应的实现上都会阻塞在内部互斥变量m_上

  • 相关阅读:
    Java日志体系(1) —— 那些年那些事,那些日志的历史
    直播工作原理
    【PAT乙级 】1003. 我要通过!
    [牛客网刷题]被3整除
    [牛客网刷题]牛牛找工作
    Mybatis的简单分析
    数位DP
    正则表达式
    能量球
    从此,我们相伴,不离不弃
  • 原文地址:https://www.cnblogs.com/fwensen/p/5778134.html
Copyright © 2011-2022 走看看