zoukankan      html  css  js  c++  java
  • UDT中的epoll

    epoll 是为处理大量句柄而改进的poll,在UDT中也有支持。UDT使用了内核提供的epoll,主要是epoll_create,epoll_wait,epoll_ctl,UDT定义了CEPollDesc这个结构来管理epoll的描述符和套接字。
    struct CEPollDesc
    {
       int m_iID;                                // epoll ID
       std::set<UDTSOCKET> m_sUDTSocksOut;       // set of UDT sockets waiting for write events
       std::set<UDTSOCKET> m_sUDTSocksIn;        // set of UDT sockets waiting for read events
       std::set<UDTSOCKET> m_sUDTSocksEx;        // set of UDT sockets waiting for exceptions
     
       int m_iLocalID;                           // local system epoll ID
       std::set<SYSSOCKET> m_sLocals;            // set of local (non-UDT) descriptors
     
       std::set<UDTSOCKET> m_sUDTWrites;         // UDT sockets ready for write
       std::set<UDTSOCKET> m_sUDTReads;          // UDT sockets ready for read
       std::set<UDTSOCKET> m_sUDTExcepts;        // UDT sockets with exceptions (connection broken, etc.)
    };
    特别要提醒的是,当对端socket连接中断的时候,也是在m_sUDTReads里的
    UDT还实现了一个类来进行各项操作,实现的有
    create():创建一个epoll,调用了epoll_create
    add_usock():添加一个UDT套接字到epoll
    add_ssock():添加一个系统套接字到epoll,调用了epoll_ctl
    remove_usock():从epoll中移除一个UDT套接字
    remove_ssock():从epoll中移除一个系统套接字,调用了epoll_ctl
    wait():等待epoll事件或者超时,调用了epoll_wait
    release():关闭并释放一个epoll
     
    UDT里对epoll的调用是四段式的,比如add_usock这里,调用顺序是epoll_add_usock()->CUDT::epoll_add_usock()->s_UDTUnited.epoll_add_usock()->CEPoll::add_usock
     
    int epoll_add_usock(int eid, UDTSOCKET u, const int* events)
    {
       return CUDT::epoll_add_usock(eid, u, events);
    }
     
    int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
    {
       try
       {
          return s_UDTUnited.epoll_add_usock(eid, u, events);
       }
       catch (CUDTException e)
       {
          s_UDTUnited.setError(new CUDTException(e));
          return ERROR;
       }
       catch (...)
       {
          s_UDTUnited.setError(new CUDTException(-1, 0, 0));
          return ERROR;
       }
    }
     
    int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
    {
       CUDTSocket* s = locate(u);
       int ret = -1;
       if (NULL != s)
       {
          ret = m_EPoll.add_usock(eid, u, events);
          s->m_pUDT->addEPoll(eid);
       }
       else
       {
          throw CUDTException(5, 4);
       }
     
       return ret;
    }
     
    int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
    {
       CGuard pg(m_EPollLock);
     
       map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
       if (p == m_mPolls.end())
          throw CUDTException(5, 13);
     
       if (!events || (*events & UDT_EPOLL_IN))       //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
          p->second.m_sUDTSocksIn.insert(u);
       if (!events || (*events & UDT_EPOLL_OUT))
          p->second.m_sUDTSocksOut.insert(u);
     
       return 0;
    }
     
    UDT命名空间提供给应用程序调用接口,可成为API层,API层调用CUDT API,这一层主要做错误处理,捕捉下面两层抛出的错误保存起来交给应用程序使用,CUDT API调用CUDTUnited的实现,如果是UDT SOCKET的socket(),bind(),listen()等,到这层也就结束了,不过epoll要多个第四层,再调用CEPoll的实现。现在来看看CUDTUnited和CEPoll的实现。
    CUDTSocket* s = locate(u);
     调用CUDTUnited::locate(),根据SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到对应的CUDTSocket结构,如果找不到,抛出错误,找到了就调用CEPoll的add_usock实现,根据传的event参数,将socket导入相应的队列。之后调用s->m_pUDT->addEPoll(eid)
    void CUDT::addEPoll(const int eid)
    {
       CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);                      //这种通过类来实现加锁解锁的,我第一次见,还挺方便。
       m_sPollID.insert(eid);
       CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
     
       if (!m_bConnected || m_bBroken || m_bClosing)
          return;
     
       if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
          ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
       }
       if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
       }
    }
    这里已经开始更新m_sUDTWrites,m_sUDTReads,通过update_events(),如前所述,update_events()也是CEPoll的成员函数。
     
    int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
    {
       CGuard pg(m_EPollLock);
     
       map<int, CEPollDesc>::iterator p;
     
       vector<int> lost;
       for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
       {
          p = m_mPolls.find(*i);
          if (p == m_mPolls.end())
          {
             lost.push_back(*i);
          }
          else
          {
             if ((events & UDT_EPOLL_IN) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);   //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
             if ((events & UDT_EPOLL_OUT) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
             if ((events & UDT_EPOLL_ERR) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
          }
       }
     
       for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
          eids.erase(*i);
     
       return 0;
    }
     
    void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
    {
       if (enable && (watch.find(uid) != watch.end()))
       {
          result.insert(uid);
       }
       else if (!enable)
       {
          result.erase(uid);
       }
    }
     
    最后说下wait函数的实现,一样是四段式,就跳过前面三段,直接看第四段
     
    int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
    {
       // if all fields is NULL and waiting time is infinite, then this would be a deadlock   都空的的话,会死锁,抛出异常
       if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))
          throw CUDTException(5, 3, 0);
     
       // Clear these sets in case the app forget to do it.  清空
       if (readfds) readfds->clear();
       if (writefds) writefds->clear();
       if (lrfds) lrfds->clear();
       if (lwfds) lwfds->clear();
     
       int total = 0;
     
       int64_t entertime = CTimer::getTime();
       while (true)
       {
          CGuard::enterCS(m_EPollLock);
     
          map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
          if (p == m_mPolls.end())
          {
             CGuard::leaveCS(m_EPollLock);
             throw CUDTException(5, 13);
          }
     
          if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
          {
             // no socket is being monitored, this may be a deadlock  都空的的话,会死锁,抛出异常
             CGuard::leaveCS(m_EPollLock);
             throw CUDTException(5, 3);
          }
     
          // Sockets with exceptions are returned to both read and write sets.          把m_sUDTReads和m_sUDTExcepts都读到readfds里
          if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
          {
             *readfds = p->second.m_sUDTReads;
             for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
                readfds->insert(*i);
             total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();
          }
          if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))          //把m_sUDTWrites和m_sUDTExcepts都读到writefds里
          {
             *writefds = p->second.m_sUDTWrites;
             for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
                writefds->insert(*i);
             total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
          }
     
          if (lrfds || lwfds)     //读系统套接字
          {
             #ifdef LINUX
             const int max_events = p->second.m_sLocals.size();
             epoll_event ev[max_events];
             int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);
     
             for (int i = 0; i < nfds; ++ i)
             {
                if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
               {
                   lrfds->insert(ev[i].data.fd);
                   ++ total;
                }
                if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
                {
                   lwfds->insert(ev[i].data.fd);
                   ++ total;
                }
             }
             #else
             //currently "select" is used for all non-Linux platforms.
             //faster approaches can be applied for specific systems in the future.
     
             //"select" has a limitation on the number of sockets
     
             fd_set readfds;
             fd_set writefds;
             FD_ZERO(&readfds);
             FD_ZERO(&writefds);
     
             for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
             {
                if (lrfds)
                   FD_SET(*i, &readfds);
                if (lwfds)
                   FD_SET(*i, &writefds);
             }
     
             timeval tv;
             tv.tv_sec = 0;
             tv.tv_usec = 0;
             if (::select(0, &readfds, &writefds, NULL, &tv) > 0)
             {
                for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
                {
                   if (lrfds && FD_ISSET(*i, &readfds))
                   {
                      lrfds->insert(*i);
                      ++ total;
                   }
                   if (lwfds && FD_ISSET(*i, &writefds))
                   {
                      lwfds->insert(*i);
                      ++ total;
                   }
                }
             }
             #endif
          }
     
          CGuard::leaveCS(m_EPollLock);
     
          if (total > 0)
             return total;
     
          if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))
             throw CUDTException(6, 3, 0);
     
          CTimer::waitForEvent();
       }
     
       return 0;
    }
  • 相关阅读:
    poj 3321 Apple Tree
    hdu 1520 Anniversary party
    Light OJ 1089 Points in Segments (II)
    Timus 1018 Binary Apple Tree
    zoj 3299 Fall the Brick
    HFUT 1287 法默尔的农场
    Codeforces 159C String Manipulation 1.0
    GraphQL + React Apollo + React Hook 大型项目实战(32 个视频)
    使用 TypeScript & mocha & chai 写测试代码实战(17 个视频)
    GraphQL + React Apollo + React Hook + Express + Mongodb 大型前后端分离项目实战之后端(19 个视频)
  • 原文地址:https://www.cnblogs.com/shenlinken/p/5513821.html
Copyright © 2011-2022 走看看