zoukankan      html  css  js  c++  java
  • UDT源码剖析(四)之Socket函数

    UDTSOCKET socket(int af, int type, int protocol)

    UDT SOCKET的创建顺序:UDTSOCKET UDT::socket(int af,int type,int protocol) -> UDTSOCKET CUDT::socket(int af,int tyoe,int proctol) -> UDTSOCKET CUDTUnited::newSocket(int af, int type)
    • Order 0:UDTSOCKET CUDTUnited::newSocket(int af, int type):创建UDT SOCKET
    UDTSOCKET CUDTUnited::newSocket(int af, int type)
    {
       if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))    //如果参数不正确,直接返回
          throw CUDTException(5, 3, 0);
    
       CUDTSocket* ns = NULL;
    
       try
       {
          ns = new CUDTSocket;    //new一个CUDTSocket
          ns->m_pUDT = new CUDT;    //紧接着new一个CUDT
          if (AF_INET == af)    //根据IPV4 OR IPV6 ,更新本地地址,并将端口预设为0
          {
             ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
             ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
          }
          else
          {
             ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
             ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
          }
       }
       catch (...)
       {
          delete ns;
          throw CUDTException(3, 2, 0);
       }
    
       CGuard::enterCS(m_IDLock);
       ns->m_SocketID = -- m_SocketID;    //在初始化的时候会随机一个UDT SOCKET,之后新创建的UDT SOCKET在此基础上累加或者累减就可以了
       CGuard::leaveCS(m_IDLock);
    
       ns->m_Status = INIT;    //调整UDTSocket的状态为INIT
       ns->m_ListenSocket = 0;    //初始化Listen Socket ID为0
       ns->m_pUDT->m_SocketID = ns->m_SocketID;    //将刚刚获得ID注册到CUDT中
       ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;    //确定CUDT的类型
       ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
       ns->m_pUDT->m_pCache = m_pCache;    //CUDT与CUDTUnited共用一个CCache
    
       // protect the m_Sockets structure.
       CGuard::enterCS(m_ControlLock);
       try
       {
          m_Sockets[ns->m_SocketID] = ns;    //在全局的map中保存CUDTSocket*
       }
       catch (...)
       {
          //failure and rollback
          CGuard::leaveCS(m_ControlLock);
          delete ns;
          ns = NULL;
       }
       CGuard::leaveCS(m_ControlLock);
    
       if (NULL == ns)
          throw CUDTException(3, 2, 0);
    
       return ns->m_SocketID;
    }
    

    int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)

    bind的关联顺序:int UDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::open() -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    • Order 0:int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
    int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen):将UDT SOCKET与某一个地址相关联
    {
       CUDTSocket* s = locate(u);    //获取这个UDT SOCKET的CUDTSocket*
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // cannot bind a socket more than once
       if (INIT != s->m_Status)
          throw CUDTException(5, 0, 0);
    
       // check the size of SOCKADDR structure
       if (AF_INET == s->m_iIPversion)
       {
          if (namelen != sizeof(sockaddr_in))
             throw CUDTException(5, 3, 0);
       }
       else
       {
          if (namelen != sizeof(sockaddr_in6))
             throw CUDTException(5, 3, 0);
       }
    
       s->m_pUDT->open();    //调用CUDT*中的open()并修改CUDT实例的状态
       updateMux(s, name);
       s->m_Status = OPENED;    //调整CUDTSocket的状态为Opened
    
       // copy address information of local node
       s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
    
       return 0;
    }
    
    • Order 1:void CUDT::open():继续填充CUDT中的选项
    void CUDT::open()
    {
       CGuard cg(m_ConnectionLock);
        
       //初始化有效载荷的大小 
       m_iPktSize = m_iMSS - 28;    //packet size = MSS - 28
       m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;    //payload size = MSS - 28 - 16
    
       m_iEXPCount = 1;    //异常计数器设置为1
       m_iBandwidth = 1;    //估计带宽为1 packet / S
       m_iDeliveryRate = 16;    //对方的接收速率为16 packet / S
       m_iAckSeqNo = 0;    //上一次收到的ACK为0
       m_ullLastAckTime = 0;        //上一次接收ACK的事件为0
    
       m_StartTime = CTimer::getTime();    //初始时间为当前时间
       m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;    //统计信息初始化
       m_LastSampleTime = CTimer::getTime();    //最后的性能采样时间为现在
       m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;    //统计信息为0
       m_llSndDuration = m_llSndDurationTotal = 0;    
    
       if (NULL == m_pSNode)    //初始化发送链表,如果不存在,就new一个
          m_pSNode = new CSNode;
       m_pSNode->m_pUDT = this;    //确定Send List Node中的CUDT*的指向
       m_pSNode->m_llTimeStamp = 1;    //将堆的比较值初始化为1
       m_pSNode->m_iHeapLoc = -1;    //目前不存在于堆中
    
       if (NULL == m_pRNode)    //初始化接收链表,如果不存在就new一个
          m_pRNode = new CRNode;
       m_pRNode->m_pUDT = this;    //确定Recv List Node中的CUDT*的指向
       m_pRNode->m_llTimeStamp = 1;    //将初始化位置信息为1
       m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;    //暂时不存在于堆中
       m_pRNode->m_bOnList = false;
    
       m_iRTT = 10 * m_iSYNInterval;    //RTT为 10 * 10000个CPU时钟周期
       m_iRTTVar = m_iRTT >> 1;    //RTT方差为RTT >> 1
       m_ullCPUFrequency = CTimer::getCPUFrequency();    //获得CPU的时钟周期
    
       // set up the timers
       m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;    //初始化SYN发送的时间间隔为10000个CPU时钟周期
    
       //设置NAK超时下限与超时下限阀值为100ms,这300000个时钟周期就是100ms??
       m_ullMinNakInt = 300000 * m_ullCPUFrequency;
       m_ullMinExpInt = 300000 * m_ullCPUFrequency;
    
        //ACK与NAK的发送间隔与SYN的发送间隔相同
       m_ullACKInt = m_ullSYNInt;    
       m_ullNAKInt = m_ullMinNakInt;
    
       uint64_t currtime;    //获得当前的时间,gettimeofday()
       CTimer::rdtsc(currtime);
       m_ullLastRspTime = currtime;    //上一次的请求连接时间为现在
       m_ullNextACKTime = currtime + m_ullSYNInt;    //确定下一次ACK与SYN的发送时间
       m_ullNextNAKTime = currtime + m_ullNAKInt;
    
       m_iPktCount = 0;    //收到的Packet计数为0
       m_iLightACKCount = 1;    //收到的ACK计数为1
    
       m_ullTargetTime = 0;    //下一个数据包的预计发送时间为0
       m_ullTimeDiff = 0;    //两个数据包发送间隔为0,之后要根据RTT估算
    
       // Now UDT is opened.
       m_bOpened = true;    
    }
    
    • Order 1:void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    {
       CGuard cg(m_ControlLock);
    
       if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
       {
           //如果还没有关联到某一个CMultiplexer上,先获得想要关联的port 
          int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);
    
          // 根据获得port在CMultiplexer中进行寻找
          for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
          {
             if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
             {
                //获得相应的CMultiplexer
                if (i->second.m_iPort == port)
                {
                   // reuse the existing multiplexer
                   ++ i->second.m_iRefCount;    //首先叠加引用计数
                   s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;    //享用Send Queue
                   s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;    //享用Recv Queue
                   s->m_iMuxID = i->second.m_iID;    //顺便填充ID,为下一次查找提供便利
                   return;
                }
             }
          }
       }
    
       // 如果没有找到,就意味着需要创建一个新的CMultiplexer,并将新创建的CMultiplexer与CUDT相关联
       CMultiplexer m;
       m.m_iMSS = s->m_pUDT->m_iMSS;
       m.m_iIPversion = s->m_pUDT->m_iIPversion;
       m.m_iRefCount = 1;
       m.m_bReusable = s->m_pUDT->m_bReuseAddr;
       m.m_iID = s->m_SocketID;
    
       m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);    //由CMultiplexer管理CChannel
       m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
       m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
    
       try
       {
          if (NULL != udpsock)
             m.m_pChannel->open(*udpsock);    //打开这个UDP SOCKET,如果提供Port,就与相关的Port关联,如果不提供,就随机选择Port
          else
             m.m_pChannel->open(addr);
       }
       catch (CUDTException& e)
       {
          m.m_pChannel->close();
          delete m.m_pChannel;
          throw e;
       }
    
       //全部都是初始化,将CUDT与CMultiplexer相关联,并将新创建的CMultiplexer添加到其中
       sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
       m.m_pChannel->getSockAddr(sa);
       m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
       if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;
    
       m.m_pTimer = new CTimer;
    
       //Send Queue与Recv Queue共享Timer
       m.m_pSndQueue = new CSndQueue;
       m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
       m.m_pRcvQueue = new CRcvQueue;
       m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
    
       m_mMultiplexer[m.m_iID] = m;
    
       s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
       s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
       s->m_iMuxID = m.m_iID;
    }
    

    int bind2(UDTSOCKET u, UDPSOCKET udpsock)

    bind2的关联顺序:int UDT::bind2(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)

    注意:bind的情况基本一样,唯一的不同时UDP端口的选择,如果没有提供UDP端口,会进行随机的选择。

    • Order 0:int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock):将UDT Socket与UDP Socket关联起来
    int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // cannot bind a socket more than once
       if (INIT != s->m_Status)
          throw CUDTException(5, 0, 0);
    
       sockaddr_in name4;
       sockaddr_in6 name6;
       sockaddr* name;
       socklen_t namelen;
    
       if (AF_INET == s->m_iIPversion)
       {
          namelen = sizeof(sockaddr_in);
          name = (sockaddr*)&name4;
       }
       else
       {
          namelen = sizeof(sockaddr_in6);
          name = (sockaddr*)&name6;
       }
    
       if (-1 == ::getsockname(udpsock, name, &namelen))
          throw CUDTException(5, 3);
    
       s->m_pUDT->open();
       updateMux(s, name, &udpsock);
       s->m_Status = OPENED;
    
       // copy address information of local node
       s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
    
       return 0;
    }
    
    • 没有提供UDP参数:void CChannel::open(const sockaddr* addr)
    void CChannel::open(const sockaddr* addr)
    {
       // 创建UDP Socket
       m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);
    
       #ifdef WINDOWS
          if (INVALID_SOCKET == m_iSocket)
       #else
          if (m_iSocket < 0)
       #endif
          throw CUDTException(1, 0, NET_ERROR);
    
       //如果有提供地址,将提供的地址与创建的UDT关联。如果没有提供地址,调用函数从本地获取地址,然后与创建的Socket关联 
       if (NULL != addr)
       {
          socklen_t namelen = m_iSockAddrSize;
    
          //将addr与UDP Socket关联  
          if (0 != ::bind(m_iSocket, addr, namelen))
             throw CUDTException(1, 3, NET_ERROR);
       }
       else
       {
          //sendto or WSASendTo will also automatically bind the socket
          addrinfo hints;
          addrinfo* res;
    
          memset(&hints, 0, sizeof(struct addrinfo));
    
          hints.ai_flags = AI_PASSIVE;
          hints.ai_family = m_iIPversion;
          hints.ai_socktype = SOCK_DGRAM;
    
          if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
             throw CUDTException(1, 3, NET_ERROR);
    
          if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
             throw CUDTException(1, 3, NET_ERROR);
    
          ::freeaddrinfo(res);
       }
    
       setUDPSockOpt();    //设置UDP的接收/发送缓冲区,并将UDP设置为非阻塞
    }
    
    • 提供UDP参数:void CChannel::open(UDPSOCKET udpsock)
    void CChannel::open(UDPSOCKET udpsock)
    {
       m_iSocket = udpsock;
       setUDPSockOpt();
    }
    

    int listen(UDTSOCKET u, int backlog)

    listen的启动顺序:void UDT::listen(UDTSOCKET u,int backlog) -> int CUDT::listen(UDTSOCKET u, int backlog) -> int CUDTUnited::listen(const UDTSOCKET u, int backlog) -> void CUDT::listen()
    • Order 0:int CUDTUnited::listen(const UDTSOCKET u, int backlog):将创建的UDTSOCKET作为Listener Socket
    int CUDTUnited::listen(const UDTSOCKET u, int backlog)
    {
       CUDTSocket* s = locate(u);    //从全局的map中寻找CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // 如果当前的CUDTSocket已经处于Listening状态,直接返回
       if (LISTENING == s->m_Status)
          return 0;
    
       // 如果当前的CUDTSocket还没有打开,抛出异常
       if (OPENED != s->m_Status)
          throw CUDTException(5, 5, 0);
    
       // Listener不支持交会连接模式
       if (s->m_pUDT->m_bRendezvous)
          throw CUDTException(5, 7, 0);
    
       //如果队列长度小于backlog,就直接返回  
       if (backlog <= 0)
          throw CUDTException(5, 3, 0);
    
       s->m_uiBackLog = backlog;
    
       try
       {
          s->m_pQueuedSockets = new set<UDTSOCKET>;    //创建连接已完成,未accept()的set
          s->m_pAcceptSockets = new set<UDTSOCKET>;    //创建连接已完成,并且已经accept()的set
       }
       catch (...)
       {
          delete s->m_pQueuedSockets;
          delete s->m_pAcceptSockets;
          throw CUDTException(3, 2, 0);
       }
    
       s->m_pUDT->listen();    
    
       s->m_Status = LISTENING;    //调整状态为LISTENING
    
       return 0;
    }
    
    • Order 1:void CUDT::listen():调整CUDT实例状态
    void CUDT::listen()
    {
       CGuard cg(m_ConnectionLock);
    
       //如果状态不正确,就抛出异常
       if (!m_bOpened)    
          throw CUDTException(5, 0, 0);
    
       if (m_bConnecting || m_bConnected)
          throw CUDTException(5, 2, 0);
    
       // 如果已经处于LISTENING状态,直接返回
       if (m_bListening)
          return;
    
       //将这个CUDT实例设置为Listener
       if (m_pRcvQueue->setListener(this) < 0)
          throw CUDTException(5, 11, 0);
    
       m_bListening = true;
    }
    

    UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)

    accept的处理顺序:UDTSOCKET UDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    • Order 0:UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    {
       if ((NULL != addr) && (NULL == addrlen))
          throw CUDTException(5, 3, 0);
    
       CUDTSocket* ls = locate(listener);    //首先寻找Listener的CUDTSocket实例
    
       if (ls == NULL)
          throw CUDTException(5, 4, 0);
    
       // 如果Listener的状态不正确,直接返回
       if (LISTENING != ls->m_Status)
          throw CUDTException(5, 6, 0);
    
       // Listener不可能存在交会连接模式
       if (ls->m_pUDT->m_bRendezvous)
          throw CUDTException(5, 7, 0);
    
       UDTSOCKET u = CUDT::INVALID_SOCK;
       bool accepted = false;
    
       // !!only one conection can be set up each time!!
       #ifndef WINDOWS
          while (!accepted)
          {
             pthread_mutex_lock(&(ls->m_AcceptLock));
            
             //再次判断连接状态,如果不正确,退出循环   
             if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
             {
                // This socket has been closed.
                accepted = true;
             }
             else if (ls->m_pQueuedSockets->size() > 0)    //如果此时有存在的连接,等到accept()进行处理。取出连接并将其加入已连接队列,退出循环
             {
                u = *(ls->m_pQueuedSockets->begin());    
                ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
                ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
                accepted = true;
             }
             else if (!ls->m_pUDT->m_bSynRecving)
             {
                accepted = true;
             }
                
             //如果状态正确,但是没有等到连接,进入等待状态,等待事件发生时被唤醒
             if (!accepted && (LISTENING == ls->m_Status))
                pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
            
             //如果等待accept()队列为空,取消关注这个Listener的可读事件   
             if (ls->m_pQueuedSockets->empty())
                m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
    
             pthread_mutex_unlock(&(ls->m_AcceptLock));
          }
       #else
          while (!accepted)
          {
             WaitForSingleObject(ls->m_AcceptLock, INFINITE);
    
             if (ls->m_pQueuedSockets->size() > 0)
             {
                u = *(ls->m_pQueuedSockets->begin());
                ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
                ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
    
                accepted = true;
             }
             else if (!ls->m_pUDT->m_bSynRecving)
                accepted = true;
    
             ReleaseMutex(ls->m_AcceptLock);
    
             if  (!accepted & (LISTENING == ls->m_Status))
                WaitForSingleObject(ls->m_AcceptCond, INFINITE);
    
             if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
             {
                // Send signal to other threads that are waiting to accept.
                SetEvent(ls->m_AcceptCond);
                accepted = true;
             }
    
             if (ls->m_pQueuedSockets->empty())
                m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
          }
       #endif
    
       //处理收到了一个无效的SOCKET的情况 
       if (u == CUDT::INVALID_SOCK)
       {
          // non-blocking receiving, no connection available
          if (!ls->m_pUDT->m_bSynRecving)
             throw CUDTException(6, 2, 0);
    
          // listening socket is closed
          throw CUDTException(5, 6, 0);
       }
    
       //否则的话,本次获得了一个有效的UDT SOCKET,然后填充用户提供的参数 
       if ((addr != NULL) && (addrlen != NULL))
       {
          if (AF_INET == locate(u)->m_iIPversion)
             *addrlen = sizeof(sockaddr_in);
          else
             *addrlen = sizeof(sockaddr_in6);
    
          // copy address information of peer node
          memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
       }
    
       return u;
    }
    

    int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)

    connect的处理顺序:int UDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::connect(const sockaddr* serv_addr)
    • Order 0:int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
    int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       //检查CUDTSocket*实例状态
       if (AF_INET == s->m_iIPversion)
       {
          if (namelen != sizeof(sockaddr_in))
             throw CUDTException(5, 3, 0);
       }
       else
       {
          if (namelen != sizeof(sockaddr_in6))
             throw CUDTException(5, 3, 0);
       }
    
       //一个UDTSocket实例只有在INIT的状态下才能进行Connect操作
       if (INIT == s->m_Status)
       {
          if (!s->m_pUDT->m_bRendezvous)    //假设不执行交汇连接操作
          {
             s->m_pUDT->open();    //首先打开,其次将这个UDT SOCKET关联到UDP资源复用器上,具体的代码分析见上述
             updateMux(s);
             s->m_Status = OPENED;    //在成功之后,将这个UDT SOCKET的状态设置为OPENED
          }
          else
             throw CUDTException(5, 8, 0);
       }
       else if (OPENED != s->m_Status)
          throw CUDTException(5, 2, 0);
        
       //然后将UDT SOCKET实例的状态更新为:CONNECTING
       s->m_Status = CONNECTING;
       try
       {
          s->m_pUDT->connect(name);    //调用CUDT中的中的实例,正式进行连接
       }
       catch (CUDTException e)
       {
          s->m_Status = OPENED;
          throw e;
       }
        
       //重新记录对方的地址 
       delete s->m_pPeerAddr;
       if (AF_INET == s->m_iIPversion)
       {
          s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
          memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
       }
       else
       {
          s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
          memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
       }
    
       return 0;
    }
    
    • Order 1:void CUDT::connect(const sockaddr* serv_addr):调整CUDT实例的资源
    void CUDT::connect(const sockaddr* serv_addr)
    {
       CGuard cg(m_ConnectionLock);
    
       if (!m_bOpened)    //如果目前的状态不正确,直接返回
          throw CUDTException(5, 0, 0);
    
       if (m_bListening)    //Listener不能调用connect
          throw CUDTException(5, 2, 0);
    
       if (m_bConnecting || m_bConnected)    //只调整了CUDT SOCKET的Status为CONNECTING,还没有调整CUDT实例的状态
          throw CUDTException(5, 2, 0);
    
       // 记录对方地址
       delete m_pPeerAddr;
       m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
       memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
    
       //这个作用是在交会连接队列中等待HandleShake Packet,等待事件是1S。在我们的实现中可以不考虑
       uint64_t ttl = 3000000;
       if (m_bRendezvous)
          ttl *= 10;
       ttl += CTimer::getTime();
       m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);
    
       //因为是主动发起连接,填充想要发送的握手包
       m_ConnReq.m_iVersion = m_iVersion;    
       m_ConnReq.m_iType = m_iSockType;
       m_ConnReq.m_iMSS = m_iMSS;
       m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
       m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
       m_ConnReq.m_iID = m_SocketID;
       CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);
    
       //在发送握手包的时候随机化一个ISN
       srand((unsigned int)CTimer::getTime());
       m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));
    
       //根据这个ISN,初始化CUDT实例中的信息
       m_iLastDecSeq = m_iISN - 1;    //最后一次发送的序列号
       m_iSndLastAck = m_iISN;    //上一次收到的ACK
       m_iSndLastDataAck = m_iISN;        //最后一次用于更新发送缓冲区的ACK
       m_iSndCurrSeqNo = m_iISN - 1;    //已发送的最大的ACK
       m_iSndLastAck2 = m_iISN;    //最后送回的ACK2
       m_ullSndLastAck2Time = CTimer::getTime();    //最后送回的ACK2的时间
    
       //打包一个握手包,ID:0是握手包的标志
       CPacket request;
       char* reqdata = new char [m_iPayloadSize];
       request.pack(0, NULL, reqdata, m_iPayloadSize);
       request.m_iID = 0;
    
       int hs_size = m_iPayloadSize;
       m_ConnReq.serialize(reqdata, hs_size);    //在向已经打包的Packet中添加握手的数据
       request.setLength(hs_size);    //更新握手包的长度
       m_pSndQueue->sendto(serv_addr, request);    //调用发送队列,发送握手包
       m_llLastReqTime = CTimer::getTime();    //修改最后发送数据的时间
    
       m_bConnecting = true;    //此时还没有收到回应的数据包,所以处于连接建立过程
    
       // 如果是异步连接,直接返回,在收到回应包的时候对状态进行调整
       if (!m_bSynRecving)
       {
          delete [] reqdata;
          return;
       }
    
       // 同步连接,等待来自对方的回应,直到连接完成才能返回。不过在这块将回应包进行打包是什么意思??
       CPacket response;
       char* resdata = new char [m_iPayloadSize];
       response.pack(0, NULL, resdata, m_iPayloadSize);
    
       CUDTException e(0, 0);
    
       //在这个循环中等待对方的回应
       while (!m_bClosing)    
       {
          //如果距离上一次发送请求的时间已经过去了250ms,再次发送请求 
          if (CTimer::getTime() - m_llLastReqTime > 250000)
          {
             m_ConnReq.serialize(reqdata, hs_size);
             request.setLength(hs_size);
             if (m_bRendezvous)
                request.m_iID = m_ConnRes.m_iID;
             m_pSndQueue->sendto(serv_addr, request);
             m_llLastReqTime = CTimer::getTime();
          }
    
          response.setLength(m_iPayloadSize);
          if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
          {
             if (connect(response) <= 0)    //用于三次握手的第二次,处理收到的响应SYN的数据包
                break;    //返回0是处理成功,<0是处理错误,>0是包丢失需要重新处理
    
             //新的请求或者回应在收到回复后立即发出 
             m_llLastReqTime = 0;
          }
    
          if (CTimer::getTime() > ttl)    //如果处理这个Connection花费了太多的时间,抛出异常
          {
             e = CUDTException(1, 1, 0);
             break;
          }
       }
    
       delete [] reqdata;
       delete [] resdata;
    
       //根据不同的情况处理异常 
       if (e.getErrorCode() == 0)
       {
          if (m_bClosing)                                                 // if the socket is closed before connection...
             e = CUDTException(1);
          else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejected
             e = CUDTException(1, 2, 0);
          else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity check
             e = CUDTException(1, 4, 0);
       }
    
       if (e.getErrorCode() != 0)
          throw e;
    }
    
    • Order 2:int CUDT::connect(const CPacket& response):发送三次握手的第二次Packet,相应收到的SYN。但是,没见发送ACK2 Packet啊...不过肯定会有,之后在处理Packet的模块进行详解..
    int CUDT::connect(const CPacket& response)
    {
       // 如果处理成功,就返回0;失败返回-1;返回1 OR 2意味着连接正在进行,但是出现了丢包,需要更多的握手包
    
       if (!m_bConnecting)    //如果此时的状态没有处于Connecting状态,直接返回错误
          return -1;
    
       if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
       {
          // 数据包或者保活包的到来,意味着连接已经完成,在这种情况中,意味着连接已经完成
          goto POST_CONNECT;
       }
        
       //如果收到的回应包的类型不正确,直接返回错误 
       if ((1 != response.getFlag()) || (0 != response.getType()))
          return -1;
    
       m_ConnRes.deserialize(response.m_pcData, response.getLength());    //从回应包中将回应的数据填充到本地
    
       //判断连接模式,反正正常的连接不会在这个步骤进行额外的操作 
       if (m_bRendezvous)
       {
          // rendezvous connect require 3-way handshake
          if (1 == m_ConnRes.m_iReqType)
             return -1;
    
          if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
          {
             m_ConnReq.m_iReqType = -1;
             // the request time must be updated so that the next handshake can be sent out immediately.
             m_llLastReqTime = 0;
             return 1;
          }
       }
       else
       {
          // set cookie
          if (1 == m_ConnRes.m_iReqType)    //如果这是一个Keep-Alive Packet,意味着包已经丢失,需要重新发送请求包
          {
             m_ConnReq.m_iReqType = -1;
             m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
             m_llLastReqTime = 0;
             return 1;
          }
       }
    
    POST_CONNECT:
       //从交会连接队列中移除这个UDT SOCKET
       m_pRcvQueue->removeConnector(m_SocketID);
    
       // 根据协商的值重新填充数据,因为收到了一个SYN,就意味着需要调整ISN
       m_iMSS = m_ConnRes.m_iMSS;
       m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
       m_iPktSize = m_iMSS - 28;
       m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
       m_iPeerISN = m_ConnRes.m_iISN;
       m_iRcvLastAck = m_ConnRes.m_iISN;
       m_iRcvLastAckAck = m_ConnRes.m_iISN;
       m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
       m_PeerID = m_ConnRes.m_iID;
       memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);
    
       //连接将要完成,创建所有需要的数据结构
       try
       {
          m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
          m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
          m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
          m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
          m_pACKWindow = new CACKWindow(1024);
          m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
          m_pSndTimeWindow = new CPktTimeWindow();
       }
       catch (...)
       {
          throw CUDTException(3, 2, 0);
       }
        
       //在Cache中记录这个连接的信息 
       CInfoBlock ib;
       ib.m_iIPversion = m_iIPversion;
       CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
       if (m_pCache->lookup(&ib) >= 0)
       {
          m_iRTT = ib.m_iRTT;
          m_iBandwidth = ib.m_iBandwidth;
       }
    
       //针对这个连接提供拥塞控制算法,以后详谈
       m_pCC = m_pCCFactory->create();
       m_pCC->m_UDT = m_SocketID;
       m_pCC->setMSS(m_iMSS);
       m_pCC->setMaxCWndSize(m_iFlowWindowSize);
       m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
       m_pCC->setRcvRate(m_iDeliveryRate);
       m_pCC->setRTT(m_iRTT);
       m_pCC->setBandwidth(m_iBandwidth);
       m_pCC->init();
    
       //根据拥塞控制类型填充拥塞窗口大小
       m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
       m_dCongestionWindow = m_pCC->m_dCWndSize;
    
       // 此时正式进入连接状态
       m_bConnecting = false;    
       m_bConnected = true;
    
       //将这个UDT SOCKET置于接收队列上面,之后就可以从连接队列上接收数据了
       m_pRNode->m_bOnList = true;
       m_pRcvQueue->setNewEntry(this);
    
       //更新与这个SOCKET ID相关的事件
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
    
       // 调整UDT SOCKET实例的状态为:Connected
       s_UDTUnited.connect_complete(m_SocketID);
    
       return 0;
    }
    

    int flush(UDTSOCKET u)

    flush的处理流程:int UDT::flush(UDTSOCKET u) -> int CUDT::flush(UDTSOCKET u) -> int CUDTUnited::flush(const UDTSOCKET u) -> void CUDT::flush()
    • Order 0:int CUDTUnited::flush(const UDTSOCKET u):交由GC线程回收发送缓存区与接收缓冲区的数据,因为设置了Linger,所以需要判断是否需要一定的延迟
    int CUDTUnited::flush(const UDTSOCKET u)
    {
       CUDTSocket* s = locate(u);
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       s->m_pUDT->flush();
    
       return 0;
    }
    
    • Order 1:void CUDT::flush()
    void CUDT::flush()
    {
       uint64_t entertime = CTimer::getTime();
    
       while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
       {
          // linger has been checked by previous close() call and has expired
          if (m_ullLingerExpiration >= entertime)
             break;
    
          if (!m_bSynSending)
          {
             // if this socket enables asynchronous sending, return immediately and let GC to close it later
             if (0 == m_ullLingerExpiration)
                m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
    
             return;
          }
    
          #ifndef WINDOWS
             timespec ts;
             ts.tv_sec = 0;
             ts.tv_nsec = 1000000;
             nanosleep(&ts, NULL);
          #else
             Sleep(1);
          #endif
       }
    }
    

    int close(UDTSOCKET u)

    close的处理流程: int UDT::close(UDTSOCKET u) -> int CUDT::close(UDTSOCKET u) -> int CUDTUnited::close(const UDTSOCKET u) -> void CUDT::close()
    • Order 0:int CUDTUnited::close(const UDTSOCKET u)
    int CUDTUnited::close(const UDTSOCKET u)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard socket_cg(s->m_ControlLock);
    
       if (s->m_Status == LISTENING)    //如果是Listener
       {
          if (s->m_pUDT->m_bBroken)    //如果目前CUDT实例的状态已经损害,直接退出,等待GC线程回收资源
             return 0;
    
          s->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET关闭时间
          s->m_pUDT->m_bBroken = true;    //然后还是调整CUDT的状态为损坏
    
          // broadcast all "accept" waiting
          #ifndef WINDOWS
             pthread_mutex_lock(&(s->m_AcceptLock));
             pthread_cond_broadcast(&(s->m_AcceptCond));    //唤醒所有等待accept()的线程
             pthread_mutex_unlock(&(s->m_AcceptLock));
          #else
             SetEvent(s->m_AcceptCond);
          #endif
    
          return 0;    //调整Listener之后就可以直接退出了
       }
    
       s->m_pUDT->close();    //调用连接CUDT中的close()调整CUDT的状态
    
       // synchronize with garbage collection.
       CGuard manager_cg(m_ControlLock);
    
       // since "s" is located before m_ControlLock, locate it again in case it became invalid
       map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);    //在全局的map中寻找UDTSocket实例
       if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))    //如果没有找到或者状态为已经关闭,也可以直接退出
          return 0;
       s = i->second;    //获得CUDTSocket实例
    
       s->m_Status = CLOSED;    //调整CUDTSocket的状态为CLOSED
    
       // UDT SOCKET在关闭的时候不会立刻被移除,以防止其他的回调函数访问无效地址,定时器启动,资源会在1S之后被删除
       s->m_TimeStamp = CTimer::getTime();    //调整UDTSocket关闭的事件
    
       m_Sockets.erase(s->m_SocketID);    //从全局的map中删除
       m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));    //将这个UDTSocket加入Closed队列中等待删除
    
       CTimer::triggerEvent();
    
       return 0;
    }
    
    • Order 1:void CUDT::close():调整CUDT实例的状态
    void CUDT::close()
    {
       if (!m_bOpened)    //如果CUDT已经关闭,直接返回
          return;
    
       if (0 != m_Linger.l_onoff)    //如果设置了稍后关闭,那么在稍后的时候再进行处理
       {
          flush();
       }
    
       //从接收数据的队列中移除数据
       if (m_bConnected)
          m_pSndQueue->m_pSndUList->remove(this);
    
       // 清理与这个UDTSocket实例相关的事件
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
    
       // 将这个UDT SOCKET ID从所有注册的Epoll中移除
       try
       {
          for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
             s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
       }
       catch (...)
       {
       }
    
       if (!m_bOpened)    //再次检查状态
          return;
    
       // 目前正处于关闭过程中
       m_bClosing = true;
    
       CGuard cg(m_ConnectionLock);
    
       // 如果发送者和接受者还在等待数据,发送信号通知他们退出,pthread_cond_signal
       releaseSynch();
    
       if (m_bListening)    //如果是Listener
       {
          m_bListening = false;    //调整Listener的状态
          m_pRcvQueue->removeListener(this);    //从接收队列中移除这个Listener,不再负责接收的事务处理
       }
       else if (m_bConnecting)    //如果正处于CONNETING
       {
          m_pRcvQueue->removeConnector(m_SocketID);    //从交汇连接队列中移除这个UDT SOCKET
       }
    
       if (m_bConnected)    //如果连接已经完成
       {
          if (!m_bShutdown)    //如果对方还没有发送ShutDown
             sendCtrl(5);    //name我们就向对方发送ShutDown
    
          m_pCC->close();    //关闭拥塞控制
    
          // 在Cache中存储并更新这个连接的信息
          CInfoBlock ib;
          ib.m_iIPversion = m_iIPversion;
          CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
          ib.m_iRTT = m_iRTT;
          ib.m_iBandwidth = m_iBandwidth;
          m_pCache->update(&ib);
    
          m_bConnected = false;    //调整状态为关闭
       }
    
       // 等待Send和Recv的停止
       CGuard sendguard(m_SendLock);    //如果在发送数据或者接收数据时,Buffer不够用的时候,一般会使用pthread_cond_wait睡眠一会
       CGuard recvguard(m_RecvLock);
    
       // CLOSED.
       m_bOpened = false;    //这个UDT SOCKET已经关闭
    }
    

    int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false)

    send的处理流程:int UDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
    • Order 0:int CUDT::send(const char* data, int len):处理发送数据报的情况,仅适用于UDP
    int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
    {
       if (UDT_STREAM == m_iSockType)    //如果不是UDP类型直接返回
          throw CUDTException(5, 9, 0);
    
       // 如果当前的状态不正确,要记得退出
       if (m_bBroken || m_bClosing)
          throw CUDTException(2, 1, 0);
       else if (!m_bConnected)
          throw CUDTException(2, 2, 0);
    
       if (len <= 0)    //需要发送的数据长度不正确
          return 0;
    
       if (len > m_iSndBufSize * m_iPayloadSize)    //如果需要发送的数据  > 发送缓冲×有效载荷,记得退出
          throw CUDTException(5, 12, 0);
    
       CGuard sendguard(m_SendLock);
    
       if (m_pSndBuffer->getCurrBufSize() == 0)    //如果当前的缓冲区中没有空间,稍微延迟一会
       {
          uint64_t currtime;
          CTimer::rdtsc(currtime);
          m_ullLastRspTime = currtime;    //记录延迟计时器,避免延迟严重
       }
    
       //如果需要发送的数据 > 已经使用的空间,也就是说空间还是不够用  
       if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
       {
          if (!m_bSynSending)    //如果没有设置异步发送标志,要抛出异常
             throw CUDTException(6, 1, 0);
          else
          {
             // wait here during a blocking sending
             #ifndef WINDOWS
                pthread_mutex_lock(&m_SendBlockLock);    //拿到睡眠阻塞锁
                if (m_iSndTimeOut < 0)    //如果此时还没有到超时时间
                {    //再次经过判断空间还是不够用,那就得睡眠一会喽
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))    
                      pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
                }
                else    //如果已经超时
                {
                   uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;    再给一次机会睡眠一会
                   timespec locktime;
    
                   locktime.tv_sec = exptime / 1000000;
                   locktime.tv_nsec = (exptime % 1000000) * 1000;
    
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                      pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
                }
                pthread_mutex_unlock(&m_SendBlockLock);
             #else
                if (m_iSndTimeOut < 0)
                {
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
                      WaitForSingleObject(m_SendBlockCond, INFINITE);
                }
                else
                {
                   uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
    
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                      WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
                }
             #endif
    
             // check the connection status
             if (m_bBroken || m_bClosing)
                throw CUDTException(2, 1, 0);
             else if (!m_bConnected)
                throw CUDTException(2, 2, 0);
          }
       }
    
       if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)    //如果Buffer还是不够用,并且已经超时,要抛出异常
       {
          if (m_iSndTimeOut >= 0)
             throw CUDTException(6, 3, 0);
    
          return 0;    //空间不够用但是没有超时,返回0,告诉用户数据没有发送
       }
    
       // 记录这一次发送数据的时间
       if (0 == m_pSndBuffer->getCurrBufSize())    //此时已经有了可以发送数据的空间
          m_llSndDurationCounter = CTimer::getTime();
    
       // 将用户提供的数据添加到UDT SOCKET的空间中
       m_pSndBuffer->addBuffer(data, len, msttl, inorder);
    
       // 如果这个UDT SOCKET的发送链表中,将这个UDT添加到UDP的发送链表中
       m_pSndQueue->m_pSndUList->update(this, false);
    
       //如果发送缓冲区中没有足够可用的数据,取消可以写入事件标志 
       if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
       }
    
       return len;
    }
    

    int recvmsg(UDTSOCKET u, char* buf, int len)

    recvmsg的处理流程:int UDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(char* data, int len)
    • Order 1:int CUDT::recvmsg(char* data, int len):处理UDP数据报的接收请求
    int CUDT::recvmsg(char* data, int len)
    {
       if (UDT_STREAM == m_iSockType)    //如果是TCP类型的数据报,直接退出
          throw CUDTException(5, 9, 0);
    
       //如果状态不正确,直接退出
       if (!m_bConnected)
          throw CUDTException(2, 2, 0);
    
       //如果想要获取len<=0,直接返回
       if (len <= 0)
          return 0;
    
       CGuard recvguard(m_RecvLock);
    
       if (m_bBroken || m_bClosing)    //如果目前处于关闭/损坏的状态
       {
          int res = m_pRcvBuffer->readMsg(data, len);    //依旧尝试从Buffer中读取数据
    
          if (m_pRcvBuffer->getRcvMsgNum() <= 0)    //如果UDT Queue中已经没有数据了,取消此UDT的可读取事件
          {
             // read is not available any more
             s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
          }
    
          if (0 == res)    //如果读取失败,抛出异常,否则返回读取的字节数
             throw CUDTException(2, 1, 0);
          else
             return res;
       }
    
       if (!m_bSynRecving)    //如果没有设置异步读取机制,没有读取到数据时要抛出异常,读到了就直接返回
       {
          int res = m_pRcvBuffer->readMsg(data, len);
          if (0 == res)
             throw CUDTException(6, 2, 0);
          else
             return res;
       }
    
       int res = 0;    //异步读取的话可以延迟一小会
       bool timeout = false;
    
       do
       {
          #ifndef WINDOWS
             pthread_mutex_lock(&m_RecvDataLock);
    
             if (m_iRcvTimeOut < 0)    //还没有到超时时间,那么小憩一会
             {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                   pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
             }
             else    //已经到了超时时间,但是再给你一次机会,等待一会
             {
                uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
                timespec locktime;
    
                locktime.tv_sec = exptime / 1000000;
                locktime.tv_nsec = (exptime % 1000000) * 1000;
    
                if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
                   timeout = true;
    
                res = m_pRcvBuffer->readMsg(data, len);    //等待一会之后,再次尝试读取数据
             }
             pthread_mutex_unlock(&m_RecvDataLock);
          #else
             if (m_iRcvTimeOut < 0)
             {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                   WaitForSingleObject(m_RecvDataCond, INFINITE);
             }
             else
             {
                if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
                   timeout = true;
    
                res = m_pRcvBuffer->readMsg(data, len);
             }
          #endif
    
          if (m_bBroken || m_bClosing)    //如果状态不正确,抛出异常
             throw CUDTException(2, 1, 0);
          else if (!m_bConnected)
             throw CUDTException(2, 2, 0);
       } while ((0 == res) && !timeout);    //今天读不到数据,我就赖着不走了
    
       //如果接收缓冲区中确实没有数据可读,那么必须要取消关注的可读事件 
       if (m_pRcvBuffer->getRcvMsgNum() <= 0)
       {
          // read is not available any more
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
       }
    
       if ((res <= 0) && (m_iRcvTimeOut >= 0))    //如果没有读到数据,还超时了,抛出异常
          throw CUDTException(6, 3, 0);
    
       return res;
    }
    
  • 相关阅读:
    C# 文件类的操作---删除
    C#实现Zip压缩解压实例
    UVALIVE 2431 Binary Stirling Numbers
    UVA 10570 meeting with aliens
    UVA 306 Cipher
    UVA 10994 Simple Addition
    UVA 696 How Many Knights
    UVA 10205 Stack 'em Up
    UVA 11125 Arrange Some Marbles
    UVA 10912 Simple Minded Hashing
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191048.html
Copyright © 2011-2022 走看看