zoukankan      html  css  js  c++  java
  • UDT中的epoll

    epoll 是为处理大量句柄而改进的poll,在UDT中也有支持。UDT使用了内核提供的epoll,主要是epoll_create,epoll_wait,epoll_ctl,UDT定义了CEPollDesc这个结构来管理epoll的描述符和套接字。
    struct CEPollDesc
    {
       int m_iID;                                // epoll ID
       std::set<UDTSOCKET> m_sUDTSocksOut;       // set of UDT sockets waiting for write events
       std::set<UDTSOCKET> m_sUDTSocksIn;        // set of UDT sockets waiting for read events
       std::set<UDTSOCKET> m_sUDTSocksEx;        // set of UDT sockets waiting for exceptions
     
       int m_iLocalID;                           // local system epoll ID
       std::set<SYSSOCKET> m_sLocals;            // set of local (non-UDT) descriptors
     
       std::set<UDTSOCKET> m_sUDTWrites;         // UDT sockets ready for write
       std::set<UDTSOCKET> m_sUDTReads;          // UDT sockets ready for read
       std::set<UDTSOCKET> m_sUDTExcepts;        // UDT sockets with exceptions (connection broken, etc.)
    };
    特别要提醒的是,当对端socket连接中断的时候,也是在m_sUDTReads里的
    UDT还实现了一个类来进行各项操作,实现的有
    create():创建一个epoll,调用了epoll_create
    add_usock():添加一个UDT套接字到epoll
    add_ssock():添加一个系统套接字到epoll,调用了epoll_ctl
    remove_usock():从epoll中移除一个UDT套接字
    remove_ssock():从epoll中移除一个系统套接字,调用了epoll_ctl
    wait():等待epoll事件或者超时,调用了epoll_wait
    release():关闭并释放一个epoll
     
    UDT里对epoll的调用是四段式的,比如add_usock这里,调用顺序是epoll_add_usock()->CUDT::epoll_add_usock()->s_UDTUnited.epoll_add_usock()->CEPoll::add_usock
     
    int epoll_add_usock(int eid, UDTSOCKET u, const int* events)
    {
       return CUDT::epoll_add_usock(eid, u, events);
    }
     
    int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
    {
       try
       {
          return s_UDTUnited.epoll_add_usock(eid, u, events);
       }
       catch (CUDTException e)
       {
          s_UDTUnited.setError(new CUDTException(e));
          return ERROR;
       }
       catch (...)
       {
          s_UDTUnited.setError(new CUDTException(-1, 0, 0));
          return ERROR;
       }
    }
     
    int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
    {
       CUDTSocket* s = locate(u);
       int ret = -1;
       if (NULL != s)
       {
          ret = m_EPoll.add_usock(eid, u, events);
          s->m_pUDT->addEPoll(eid);
       }
       else
       {
          throw CUDTException(5, 4);
       }
     
       return ret;
    }
     
    int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
    {
       CGuard pg(m_EPollLock);
     
       map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
       if (p == m_mPolls.end())
          throw CUDTException(5, 13);
     
       if (!events || (*events & UDT_EPOLL_IN))       //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
          p->second.m_sUDTSocksIn.insert(u);
       if (!events || (*events & UDT_EPOLL_OUT))
          p->second.m_sUDTSocksOut.insert(u);
     
       return 0;
    }
     
    UDT命名空间提供给应用程序调用接口,可成为API层,API层调用CUDT API,这一层主要做错误处理,捕捉下面两层抛出的错误保存起来交给应用程序使用,CUDT API调用CUDTUnited的实现,如果是UDT SOCKET的socket(),bind(),listen()等,到这层也就结束了,不过epoll要多个第四层,再调用CEPoll的实现。现在来看看CUDTUnited和CEPoll的实现。
    CUDTSocket* s = locate(u);
     调用CUDTUnited::locate(),根据SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到对应的CUDTSocket结构,如果找不到,抛出错误,找到了就调用CEPoll的add_usock实现,根据传的event参数,将socket导入相应的队列。之后调用s->m_pUDT->addEPoll(eid)
    void CUDT::addEPoll(const int eid)
    {
       CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);                      //这种通过类来实现加锁解锁的,我第一次见,还挺方便。
       m_sPollID.insert(eid);
       CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
     
       if (!m_bConnected || m_bBroken || m_bClosing)
          return;
     
       if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
          ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
       }
       if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
       }
    }
    这里已经开始更新m_sUDTWrites,m_sUDTReads,通过update_events(),如前所述,update_events()也是CEPoll的成员函数。
     
    int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
    {
       CGuard pg(m_EPollLock);
     
       map<int, CEPollDesc>::iterator p;
     
       vector<int> lost;
       for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
       {
          p = m_mPolls.find(*i);
          if (p == m_mPolls.end())
          {
             lost.push_back(*i);
          }
          else
          {
             if ((events & UDT_EPOLL_IN) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);   //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
             if ((events & UDT_EPOLL_OUT) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
             if ((events & UDT_EPOLL_ERR) != 0)
                update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
          }
       }
     
       for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
          eids.erase(*i);
     
       return 0;
    }
     
    void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
    {
       if (enable && (watch.find(uid) != watch.end()))
       {
          result.insert(uid);
       }
       else if (!enable)
       {
          result.erase(uid);
       }
    }
     
    最后说下wait函数的实现,一样是四段式,就跳过前面三段,直接看第四段
     
    int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
    {
       // if all fields is NULL and waiting time is infinite, then this would be a deadlock   都空的的话,会死锁,抛出异常
       if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))
          throw CUDTException(5, 3, 0);
     
       // Clear these sets in case the app forget to do it.  清空
       if (readfds) readfds->clear();
       if (writefds) writefds->clear();
       if (lrfds) lrfds->clear();
       if (lwfds) lwfds->clear();
     
       int total = 0;
     
       int64_t entertime = CTimer::getTime();
       while (true)
       {
          CGuard::enterCS(m_EPollLock);
     
          map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
          if (p == m_mPolls.end())
          {
             CGuard::leaveCS(m_EPollLock);
             throw CUDTException(5, 13);
          }
     
          if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
          {
             // no socket is being monitored, this may be a deadlock  都空的的话,会死锁,抛出异常
             CGuard::leaveCS(m_EPollLock);
             throw CUDTException(5, 3);
          }
     
          // Sockets with exceptions are returned to both read and write sets.          把m_sUDTReads和m_sUDTExcepts都读到readfds里
          if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
          {
             *readfds = p->second.m_sUDTReads;
             for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
                readfds->insert(*i);
             total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();
          }
          if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))          //把m_sUDTWrites和m_sUDTExcepts都读到writefds里
          {
             *writefds = p->second.m_sUDTWrites;
             for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
                writefds->insert(*i);
             total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
          }
     
          if (lrfds || lwfds)     //读系统套接字
          {
             #ifdef LINUX
             const int max_events = p->second.m_sLocals.size();
             epoll_event ev[max_events];
             int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);
     
             for (int i = 0; i < nfds; ++ i)
             {
                if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
               {
                   lrfds->insert(ev[i].data.fd);
                   ++ total;
                }
                if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
                {
                   lwfds->insert(ev[i].data.fd);
                   ++ total;
                }
             }
             #else
             //currently "select" is used for all non-Linux platforms.
             //faster approaches can be applied for specific systems in the future.
     
             //"select" has a limitation on the number of sockets
     
             fd_set readfds;
             fd_set writefds;
             FD_ZERO(&readfds);
             FD_ZERO(&writefds);
     
             for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
             {
                if (lrfds)
                   FD_SET(*i, &readfds);
                if (lwfds)
                   FD_SET(*i, &writefds);
             }
     
             timeval tv;
             tv.tv_sec = 0;
             tv.tv_usec = 0;
             if (::select(0, &readfds, &writefds, NULL, &tv) > 0)
             {
                for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
                {
                   if (lrfds && FD_ISSET(*i, &readfds))
                   {
                      lrfds->insert(*i);
                      ++ total;
                   }
                   if (lwfds && FD_ISSET(*i, &writefds))
                   {
                      lwfds->insert(*i);
                      ++ total;
                   }
                }
             }
             #endif
          }
     
          CGuard::leaveCS(m_EPollLock);
     
          if (total > 0)
             return total;
     
          if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))
             throw CUDTException(6, 3, 0);
     
          CTimer::waitForEvent();
       }
     
       return 0;
    }
  • 相关阅读:
    基于javascript 上传
    mysql 分组查询
    php 遍历指定路径所有目录与文件夹
    设置Tomcat的内存
    SQL语句执行顺序
    XFire发布Webservice
    Java排序算法
    Oracle定时任务DBMS_JOB
    JAXWS发布WebService
    Mogodb基础知识和安装学习
  • 原文地址:https://www.cnblogs.com/shenlinken/p/5513821.html
Copyright © 2011-2022 走看看