zoukankan      html  css  js  c++  java
  • UDT源码剖析(四)之Socket函数

    UDTSOCKET socket(int af, int type, int protocol)

    UDT SOCKET的创建顺序:UDTSOCKET UDT::socket(int af,int type,int protocol) -> UDTSOCKET CUDT::socket(int af,int tyoe,int proctol) -> UDTSOCKET CUDTUnited::newSocket(int af, int type)
    • Order 0:UDTSOCKET CUDTUnited::newSocket(int af, int type):创建UDT SOCKET
    UDTSOCKET CUDTUnited::newSocket(int af, int type)
    {
       if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))    //如果参数不正确,直接返回
          throw CUDTException(5, 3, 0);
    
       CUDTSocket* ns = NULL;
    
       try
       {
          ns = new CUDTSocket;    //new一个CUDTSocket
          ns->m_pUDT = new CUDT;    //紧接着new一个CUDT
          if (AF_INET == af)    //根据IPV4 OR IPV6 ,更新本地地址,并将端口预设为0
          {
             ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
             ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
          }
          else
          {
             ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
             ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
          }
       }
       catch (...)
       {
          delete ns;
          throw CUDTException(3, 2, 0);
       }
    
       CGuard::enterCS(m_IDLock);
       ns->m_SocketID = -- m_SocketID;    //在初始化的时候会随机一个UDT SOCKET,之后新创建的UDT SOCKET在此基础上累加或者累减就可以了
       CGuard::leaveCS(m_IDLock);
    
       ns->m_Status = INIT;    //调整UDTSocket的状态为INIT
       ns->m_ListenSocket = 0;    //初始化Listen Socket ID为0
       ns->m_pUDT->m_SocketID = ns->m_SocketID;    //将刚刚获得ID注册到CUDT中
       ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;    //确定CUDT的类型
       ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
       ns->m_pUDT->m_pCache = m_pCache;    //CUDT与CUDTUnited共用一个CCache
    
       // protect the m_Sockets structure.
       CGuard::enterCS(m_ControlLock);
       try
       {
          m_Sockets[ns->m_SocketID] = ns;    //在全局的map中保存CUDTSocket*
       }
       catch (...)
       {
          //failure and rollback
          CGuard::leaveCS(m_ControlLock);
          delete ns;
          ns = NULL;
       }
       CGuard::leaveCS(m_ControlLock);
    
       if (NULL == ns)
          throw CUDTException(3, 2, 0);
    
       return ns->m_SocketID;
    }
    

    int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)

    bind的关联顺序:int UDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::open() -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    • Order 0:int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
    int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen):将UDT SOCKET与某一个地址相关联
    {
       CUDTSocket* s = locate(u);    //获取这个UDT SOCKET的CUDTSocket*
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // cannot bind a socket more than once
       if (INIT != s->m_Status)
          throw CUDTException(5, 0, 0);
    
       // check the size of SOCKADDR structure
       if (AF_INET == s->m_iIPversion)
       {
          if (namelen != sizeof(sockaddr_in))
             throw CUDTException(5, 3, 0);
       }
       else
       {
          if (namelen != sizeof(sockaddr_in6))
             throw CUDTException(5, 3, 0);
       }
    
       s->m_pUDT->open();    //调用CUDT*中的open()并修改CUDT实例的状态
       updateMux(s, name);
       s->m_Status = OPENED;    //调整CUDTSocket的状态为Opened
    
       // copy address information of local node
       s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
    
       return 0;
    }
    
    • Order 1:void CUDT::open():继续填充CUDT中的选项
    void CUDT::open()
    {
       CGuard cg(m_ConnectionLock);
        
       //初始化有效载荷的大小 
       m_iPktSize = m_iMSS - 28;    //packet size = MSS - 28
       m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;    //payload size = MSS - 28 - 16
    
       m_iEXPCount = 1;    //异常计数器设置为1
       m_iBandwidth = 1;    //估计带宽为1 packet / S
       m_iDeliveryRate = 16;    //对方的接收速率为16 packet / S
       m_iAckSeqNo = 0;    //上一次收到的ACK为0
       m_ullLastAckTime = 0;        //上一次接收ACK的事件为0
    
       m_StartTime = CTimer::getTime();    //初始时间为当前时间
       m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;    //统计信息初始化
       m_LastSampleTime = CTimer::getTime();    //最后的性能采样时间为现在
       m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;    //统计信息为0
       m_llSndDuration = m_llSndDurationTotal = 0;    
    
       if (NULL == m_pSNode)    //初始化发送链表,如果不存在,就new一个
          m_pSNode = new CSNode;
       m_pSNode->m_pUDT = this;    //确定Send List Node中的CUDT*的指向
       m_pSNode->m_llTimeStamp = 1;    //将堆的比较值初始化为1
       m_pSNode->m_iHeapLoc = -1;    //目前不存在于堆中
    
       if (NULL == m_pRNode)    //初始化接收链表,如果不存在就new一个
          m_pRNode = new CRNode;
       m_pRNode->m_pUDT = this;    //确定Recv List Node中的CUDT*的指向
       m_pRNode->m_llTimeStamp = 1;    //将初始化位置信息为1
       m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;    //暂时不存在于堆中
       m_pRNode->m_bOnList = false;
    
       m_iRTT = 10 * m_iSYNInterval;    //RTT为 10 * 10000个CPU时钟周期
       m_iRTTVar = m_iRTT >> 1;    //RTT方差为RTT >> 1
       m_ullCPUFrequency = CTimer::getCPUFrequency();    //获得CPU的时钟周期
    
       // set up the timers
       m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;    //初始化SYN发送的时间间隔为10000个CPU时钟周期
    
       //设置NAK超时下限与超时下限阀值为100ms,这300000个时钟周期就是100ms??
       m_ullMinNakInt = 300000 * m_ullCPUFrequency;
       m_ullMinExpInt = 300000 * m_ullCPUFrequency;
    
        //ACK与NAK的发送间隔与SYN的发送间隔相同
       m_ullACKInt = m_ullSYNInt;    
       m_ullNAKInt = m_ullMinNakInt;
    
       uint64_t currtime;    //获得当前的时间,gettimeofday()
       CTimer::rdtsc(currtime);
       m_ullLastRspTime = currtime;    //上一次的请求连接时间为现在
       m_ullNextACKTime = currtime + m_ullSYNInt;    //确定下一次ACK与SYN的发送时间
       m_ullNextNAKTime = currtime + m_ullNAKInt;
    
       m_iPktCount = 0;    //收到的Packet计数为0
       m_iLightACKCount = 1;    //收到的ACK计数为1
    
       m_ullTargetTime = 0;    //下一个数据包的预计发送时间为0
       m_ullTimeDiff = 0;    //两个数据包发送间隔为0,之后要根据RTT估算
    
       // Now UDT is opened.
       m_bOpened = true;    
    }
    
    • Order 1:void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
    {
       CGuard cg(m_ControlLock);
    
       if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
       {
           //如果还没有关联到某一个CMultiplexer上,先获得想要关联的port 
          int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);
    
          // 根据获得port在CMultiplexer中进行寻找
          for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
          {
             if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
             {
                //获得相应的CMultiplexer
                if (i->second.m_iPort == port)
                {
                   // reuse the existing multiplexer
                   ++ i->second.m_iRefCount;    //首先叠加引用计数
                   s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;    //享用Send Queue
                   s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;    //享用Recv Queue
                   s->m_iMuxID = i->second.m_iID;    //顺便填充ID,为下一次查找提供便利
                   return;
                }
             }
          }
       }
    
       // 如果没有找到,就意味着需要创建一个新的CMultiplexer,并将新创建的CMultiplexer与CUDT相关联
       CMultiplexer m;
       m.m_iMSS = s->m_pUDT->m_iMSS;
       m.m_iIPversion = s->m_pUDT->m_iIPversion;
       m.m_iRefCount = 1;
       m.m_bReusable = s->m_pUDT->m_bReuseAddr;
       m.m_iID = s->m_SocketID;
    
       m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);    //由CMultiplexer管理CChannel
       m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
       m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
    
       try
       {
          if (NULL != udpsock)
             m.m_pChannel->open(*udpsock);    //打开这个UDP SOCKET,如果提供Port,就与相关的Port关联,如果不提供,就随机选择Port
          else
             m.m_pChannel->open(addr);
       }
       catch (CUDTException& e)
       {
          m.m_pChannel->close();
          delete m.m_pChannel;
          throw e;
       }
    
       //全部都是初始化,将CUDT与CMultiplexer相关联,并将新创建的CMultiplexer添加到其中
       sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
       m.m_pChannel->getSockAddr(sa);
       m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
       if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;
    
       m.m_pTimer = new CTimer;
    
       //Send Queue与Recv Queue共享Timer
       m.m_pSndQueue = new CSndQueue;
       m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
       m.m_pRcvQueue = new CRcvQueue;
       m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
    
       m_mMultiplexer[m.m_iID] = m;
    
       s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
       s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
       s->m_iMuxID = m.m_iID;
    }
    

    int bind2(UDTSOCKET u, UDPSOCKET udpsock)

    bind2的关联顺序:int UDT::bind2(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)

    注意:bind的情况基本一样,唯一的不同时UDP端口的选择,如果没有提供UDP端口,会进行随机的选择。

    • Order 0:int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock):将UDT Socket与UDP Socket关联起来
    int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // cannot bind a socket more than once
       if (INIT != s->m_Status)
          throw CUDTException(5, 0, 0);
    
       sockaddr_in name4;
       sockaddr_in6 name6;
       sockaddr* name;
       socklen_t namelen;
    
       if (AF_INET == s->m_iIPversion)
       {
          namelen = sizeof(sockaddr_in);
          name = (sockaddr*)&name4;
       }
       else
       {
          namelen = sizeof(sockaddr_in6);
          name = (sockaddr*)&name6;
       }
    
       if (-1 == ::getsockname(udpsock, name, &namelen))
          throw CUDTException(5, 3);
    
       s->m_pUDT->open();
       updateMux(s, name, &udpsock);
       s->m_Status = OPENED;
    
       // copy address information of local node
       s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
    
       return 0;
    }
    
    • 没有提供UDP参数:void CChannel::open(const sockaddr* addr)
    void CChannel::open(const sockaddr* addr)
    {
       // 创建UDP Socket
       m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);
    
       #ifdef WINDOWS
          if (INVALID_SOCKET == m_iSocket)
       #else
          if (m_iSocket < 0)
       #endif
          throw CUDTException(1, 0, NET_ERROR);
    
       //如果有提供地址,将提供的地址与创建的UDT关联。如果没有提供地址,调用函数从本地获取地址,然后与创建的Socket关联 
       if (NULL != addr)
       {
          socklen_t namelen = m_iSockAddrSize;
    
          //将addr与UDP Socket关联  
          if (0 != ::bind(m_iSocket, addr, namelen))
             throw CUDTException(1, 3, NET_ERROR);
       }
       else
       {
          //sendto or WSASendTo will also automatically bind the socket
          addrinfo hints;
          addrinfo* res;
    
          memset(&hints, 0, sizeof(struct addrinfo));
    
          hints.ai_flags = AI_PASSIVE;
          hints.ai_family = m_iIPversion;
          hints.ai_socktype = SOCK_DGRAM;
    
          if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
             throw CUDTException(1, 3, NET_ERROR);
    
          if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
             throw CUDTException(1, 3, NET_ERROR);
    
          ::freeaddrinfo(res);
       }
    
       setUDPSockOpt();    //设置UDP的接收/发送缓冲区,并将UDP设置为非阻塞
    }
    
    • 提供UDP参数:void CChannel::open(UDPSOCKET udpsock)
    void CChannel::open(UDPSOCKET udpsock)
    {
       m_iSocket = udpsock;
       setUDPSockOpt();
    }
    

    int listen(UDTSOCKET u, int backlog)

    listen的启动顺序:void UDT::listen(UDTSOCKET u,int backlog) -> int CUDT::listen(UDTSOCKET u, int backlog) -> int CUDTUnited::listen(const UDTSOCKET u, int backlog) -> void CUDT::listen()
    • Order 0:int CUDTUnited::listen(const UDTSOCKET u, int backlog):将创建的UDTSOCKET作为Listener Socket
    int CUDTUnited::listen(const UDTSOCKET u, int backlog)
    {
       CUDTSocket* s = locate(u);    //从全局的map中寻找CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       // 如果当前的CUDTSocket已经处于Listening状态,直接返回
       if (LISTENING == s->m_Status)
          return 0;
    
       // 如果当前的CUDTSocket还没有打开,抛出异常
       if (OPENED != s->m_Status)
          throw CUDTException(5, 5, 0);
    
       // Listener不支持交会连接模式
       if (s->m_pUDT->m_bRendezvous)
          throw CUDTException(5, 7, 0);
    
       //如果队列长度小于backlog,就直接返回  
       if (backlog <= 0)
          throw CUDTException(5, 3, 0);
    
       s->m_uiBackLog = backlog;
    
       try
       {
          s->m_pQueuedSockets = new set<UDTSOCKET>;    //创建连接已完成,未accept()的set
          s->m_pAcceptSockets = new set<UDTSOCKET>;    //创建连接已完成,并且已经accept()的set
       }
       catch (...)
       {
          delete s->m_pQueuedSockets;
          delete s->m_pAcceptSockets;
          throw CUDTException(3, 2, 0);
       }
    
       s->m_pUDT->listen();    
    
       s->m_Status = LISTENING;    //调整状态为LISTENING
    
       return 0;
    }
    
    • Order 1:void CUDT::listen():调整CUDT实例状态
    void CUDT::listen()
    {
       CGuard cg(m_ConnectionLock);
    
       //如果状态不正确,就抛出异常
       if (!m_bOpened)    
          throw CUDTException(5, 0, 0);
    
       if (m_bConnecting || m_bConnected)
          throw CUDTException(5, 2, 0);
    
       // 如果已经处于LISTENING状态,直接返回
       if (m_bListening)
          return;
    
       //将这个CUDT实例设置为Listener
       if (m_pRcvQueue->setListener(this) < 0)
          throw CUDTException(5, 11, 0);
    
       m_bListening = true;
    }
    

    UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)

    accept的处理顺序:UDTSOCKET UDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    • Order 0:UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
    {
       if ((NULL != addr) && (NULL == addrlen))
          throw CUDTException(5, 3, 0);
    
       CUDTSocket* ls = locate(listener);    //首先寻找Listener的CUDTSocket实例
    
       if (ls == NULL)
          throw CUDTException(5, 4, 0);
    
       // 如果Listener的状态不正确,直接返回
       if (LISTENING != ls->m_Status)
          throw CUDTException(5, 6, 0);
    
       // Listener不可能存在交会连接模式
       if (ls->m_pUDT->m_bRendezvous)
          throw CUDTException(5, 7, 0);
    
       UDTSOCKET u = CUDT::INVALID_SOCK;
       bool accepted = false;
    
       // !!only one conection can be set up each time!!
       #ifndef WINDOWS
          while (!accepted)
          {
             pthread_mutex_lock(&(ls->m_AcceptLock));
            
             //再次判断连接状态,如果不正确,退出循环   
             if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
             {
                // This socket has been closed.
                accepted = true;
             }
             else if (ls->m_pQueuedSockets->size() > 0)    //如果此时有存在的连接,等到accept()进行处理。取出连接并将其加入已连接队列,退出循环
             {
                u = *(ls->m_pQueuedSockets->begin());    
                ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
                ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
                accepted = true;
             }
             else if (!ls->m_pUDT->m_bSynRecving)
             {
                accepted = true;
             }
                
             //如果状态正确,但是没有等到连接,进入等待状态,等待事件发生时被唤醒
             if (!accepted && (LISTENING == ls->m_Status))
                pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
            
             //如果等待accept()队列为空,取消关注这个Listener的可读事件   
             if (ls->m_pQueuedSockets->empty())
                m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
    
             pthread_mutex_unlock(&(ls->m_AcceptLock));
          }
       #else
          while (!accepted)
          {
             WaitForSingleObject(ls->m_AcceptLock, INFINITE);
    
             if (ls->m_pQueuedSockets->size() > 0)
             {
                u = *(ls->m_pQueuedSockets->begin());
                ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
                ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
    
                accepted = true;
             }
             else if (!ls->m_pUDT->m_bSynRecving)
                accepted = true;
    
             ReleaseMutex(ls->m_AcceptLock);
    
             if  (!accepted & (LISTENING == ls->m_Status))
                WaitForSingleObject(ls->m_AcceptCond, INFINITE);
    
             if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
             {
                // Send signal to other threads that are waiting to accept.
                SetEvent(ls->m_AcceptCond);
                accepted = true;
             }
    
             if (ls->m_pQueuedSockets->empty())
                m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
          }
       #endif
    
       //处理收到了一个无效的SOCKET的情况 
       if (u == CUDT::INVALID_SOCK)
       {
          // non-blocking receiving, no connection available
          if (!ls->m_pUDT->m_bSynRecving)
             throw CUDTException(6, 2, 0);
    
          // listening socket is closed
          throw CUDTException(5, 6, 0);
       }
    
       //否则的话,本次获得了一个有效的UDT SOCKET,然后填充用户提供的参数 
       if ((addr != NULL) && (addrlen != NULL))
       {
          if (AF_INET == locate(u)->m_iIPversion)
             *addrlen = sizeof(sockaddr_in);
          else
             *addrlen = sizeof(sockaddr_in6);
    
          // copy address information of peer node
          memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
       }
    
       return u;
    }
    

    int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)

    connect的处理顺序:int UDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::connect(const sockaddr* serv_addr)
    • Order 0:int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
    int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard cg(s->m_ControlLock);
    
       //检查CUDTSocket*实例状态
       if (AF_INET == s->m_iIPversion)
       {
          if (namelen != sizeof(sockaddr_in))
             throw CUDTException(5, 3, 0);
       }
       else
       {
          if (namelen != sizeof(sockaddr_in6))
             throw CUDTException(5, 3, 0);
       }
    
       //一个UDTSocket实例只有在INIT的状态下才能进行Connect操作
       if (INIT == s->m_Status)
       {
          if (!s->m_pUDT->m_bRendezvous)    //假设不执行交汇连接操作
          {
             s->m_pUDT->open();    //首先打开,其次将这个UDT SOCKET关联到UDP资源复用器上,具体的代码分析见上述
             updateMux(s);
             s->m_Status = OPENED;    //在成功之后,将这个UDT SOCKET的状态设置为OPENED
          }
          else
             throw CUDTException(5, 8, 0);
       }
       else if (OPENED != s->m_Status)
          throw CUDTException(5, 2, 0);
        
       //然后将UDT SOCKET实例的状态更新为:CONNECTING
       s->m_Status = CONNECTING;
       try
       {
          s->m_pUDT->connect(name);    //调用CUDT中的中的实例,正式进行连接
       }
       catch (CUDTException e)
       {
          s->m_Status = OPENED;
          throw e;
       }
        
       //重新记录对方的地址 
       delete s->m_pPeerAddr;
       if (AF_INET == s->m_iIPversion)
       {
          s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
          memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
       }
       else
       {
          s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
          memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
       }
    
       return 0;
    }
    
    • Order 1:void CUDT::connect(const sockaddr* serv_addr):调整CUDT实例的资源
    void CUDT::connect(const sockaddr* serv_addr)
    {
       CGuard cg(m_ConnectionLock);
    
       if (!m_bOpened)    //如果目前的状态不正确,直接返回
          throw CUDTException(5, 0, 0);
    
       if (m_bListening)    //Listener不能调用connect
          throw CUDTException(5, 2, 0);
    
       if (m_bConnecting || m_bConnected)    //只调整了CUDT SOCKET的Status为CONNECTING,还没有调整CUDT实例的状态
          throw CUDTException(5, 2, 0);
    
       // 记录对方地址
       delete m_pPeerAddr;
       m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
       memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
    
       //这个作用是在交会连接队列中等待HandleShake Packet,等待事件是1S。在我们的实现中可以不考虑
       uint64_t ttl = 3000000;
       if (m_bRendezvous)
          ttl *= 10;
       ttl += CTimer::getTime();
       m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);
    
       //因为是主动发起连接,填充想要发送的握手包
       m_ConnReq.m_iVersion = m_iVersion;    
       m_ConnReq.m_iType = m_iSockType;
       m_ConnReq.m_iMSS = m_iMSS;
       m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
       m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
       m_ConnReq.m_iID = m_SocketID;
       CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);
    
       //在发送握手包的时候随机化一个ISN
       srand((unsigned int)CTimer::getTime());
       m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));
    
       //根据这个ISN,初始化CUDT实例中的信息
       m_iLastDecSeq = m_iISN - 1;    //最后一次发送的序列号
       m_iSndLastAck = m_iISN;    //上一次收到的ACK
       m_iSndLastDataAck = m_iISN;        //最后一次用于更新发送缓冲区的ACK
       m_iSndCurrSeqNo = m_iISN - 1;    //已发送的最大的ACK
       m_iSndLastAck2 = m_iISN;    //最后送回的ACK2
       m_ullSndLastAck2Time = CTimer::getTime();    //最后送回的ACK2的时间
    
       //打包一个握手包,ID:0是握手包的标志
       CPacket request;
       char* reqdata = new char [m_iPayloadSize];
       request.pack(0, NULL, reqdata, m_iPayloadSize);
       request.m_iID = 0;
    
       int hs_size = m_iPayloadSize;
       m_ConnReq.serialize(reqdata, hs_size);    //在向已经打包的Packet中添加握手的数据
       request.setLength(hs_size);    //更新握手包的长度
       m_pSndQueue->sendto(serv_addr, request);    //调用发送队列,发送握手包
       m_llLastReqTime = CTimer::getTime();    //修改最后发送数据的时间
    
       m_bConnecting = true;    //此时还没有收到回应的数据包,所以处于连接建立过程
    
       // 如果是异步连接,直接返回,在收到回应包的时候对状态进行调整
       if (!m_bSynRecving)
       {
          delete [] reqdata;
          return;
       }
    
       // 同步连接,等待来自对方的回应,直到连接完成才能返回。不过在这块将回应包进行打包是什么意思??
       CPacket response;
       char* resdata = new char [m_iPayloadSize];
       response.pack(0, NULL, resdata, m_iPayloadSize);
    
       CUDTException e(0, 0);
    
       //在这个循环中等待对方的回应
       while (!m_bClosing)    
       {
          //如果距离上一次发送请求的时间已经过去了250ms,再次发送请求 
          if (CTimer::getTime() - m_llLastReqTime > 250000)
          {
             m_ConnReq.serialize(reqdata, hs_size);
             request.setLength(hs_size);
             if (m_bRendezvous)
                request.m_iID = m_ConnRes.m_iID;
             m_pSndQueue->sendto(serv_addr, request);
             m_llLastReqTime = CTimer::getTime();
          }
    
          response.setLength(m_iPayloadSize);
          if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
          {
             if (connect(response) <= 0)    //用于三次握手的第二次,处理收到的响应SYN的数据包
                break;    //返回0是处理成功,<0是处理错误,>0是包丢失需要重新处理
    
             //新的请求或者回应在收到回复后立即发出 
             m_llLastReqTime = 0;
          }
    
          if (CTimer::getTime() > ttl)    //如果处理这个Connection花费了太多的时间,抛出异常
          {
             e = CUDTException(1, 1, 0);
             break;
          }
       }
    
       delete [] reqdata;
       delete [] resdata;
    
       //根据不同的情况处理异常 
       if (e.getErrorCode() == 0)
       {
          if (m_bClosing)                                                 // if the socket is closed before connection...
             e = CUDTException(1);
          else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejected
             e = CUDTException(1, 2, 0);
          else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity check
             e = CUDTException(1, 4, 0);
       }
    
       if (e.getErrorCode() != 0)
          throw e;
    }
    
    • Order 2:int CUDT::connect(const CPacket& response):发送三次握手的第二次Packet,相应收到的SYN。但是,没见发送ACK2 Packet啊...不过肯定会有,之后在处理Packet的模块进行详解..
    int CUDT::connect(const CPacket& response)
    {
       // 如果处理成功,就返回0;失败返回-1;返回1 OR 2意味着连接正在进行,但是出现了丢包,需要更多的握手包
    
       if (!m_bConnecting)    //如果此时的状态没有处于Connecting状态,直接返回错误
          return -1;
    
       if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
       {
          // 数据包或者保活包的到来,意味着连接已经完成,在这种情况中,意味着连接已经完成
          goto POST_CONNECT;
       }
        
       //如果收到的回应包的类型不正确,直接返回错误 
       if ((1 != response.getFlag()) || (0 != response.getType()))
          return -1;
    
       m_ConnRes.deserialize(response.m_pcData, response.getLength());    //从回应包中将回应的数据填充到本地
    
       //判断连接模式,反正正常的连接不会在这个步骤进行额外的操作 
       if (m_bRendezvous)
       {
          // rendezvous connect require 3-way handshake
          if (1 == m_ConnRes.m_iReqType)
             return -1;
    
          if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
          {
             m_ConnReq.m_iReqType = -1;
             // the request time must be updated so that the next handshake can be sent out immediately.
             m_llLastReqTime = 0;
             return 1;
          }
       }
       else
       {
          // set cookie
          if (1 == m_ConnRes.m_iReqType)    //如果这是一个Keep-Alive Packet,意味着包已经丢失,需要重新发送请求包
          {
             m_ConnReq.m_iReqType = -1;
             m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
             m_llLastReqTime = 0;
             return 1;
          }
       }
    
    POST_CONNECT:
       //从交会连接队列中移除这个UDT SOCKET
       m_pRcvQueue->removeConnector(m_SocketID);
    
       // 根据协商的值重新填充数据,因为收到了一个SYN,就意味着需要调整ISN
       m_iMSS = m_ConnRes.m_iMSS;
       m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
       m_iPktSize = m_iMSS - 28;
       m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
       m_iPeerISN = m_ConnRes.m_iISN;
       m_iRcvLastAck = m_ConnRes.m_iISN;
       m_iRcvLastAckAck = m_ConnRes.m_iISN;
       m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
       m_PeerID = m_ConnRes.m_iID;
       memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);
    
       //连接将要完成,创建所有需要的数据结构
       try
       {
          m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
          m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
          m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
          m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
          m_pACKWindow = new CACKWindow(1024);
          m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
          m_pSndTimeWindow = new CPktTimeWindow();
       }
       catch (...)
       {
          throw CUDTException(3, 2, 0);
       }
        
       //在Cache中记录这个连接的信息 
       CInfoBlock ib;
       ib.m_iIPversion = m_iIPversion;
       CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
       if (m_pCache->lookup(&ib) >= 0)
       {
          m_iRTT = ib.m_iRTT;
          m_iBandwidth = ib.m_iBandwidth;
       }
    
       //针对这个连接提供拥塞控制算法,以后详谈
       m_pCC = m_pCCFactory->create();
       m_pCC->m_UDT = m_SocketID;
       m_pCC->setMSS(m_iMSS);
       m_pCC->setMaxCWndSize(m_iFlowWindowSize);
       m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
       m_pCC->setRcvRate(m_iDeliveryRate);
       m_pCC->setRTT(m_iRTT);
       m_pCC->setBandwidth(m_iBandwidth);
       m_pCC->init();
    
       //根据拥塞控制类型填充拥塞窗口大小
       m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
       m_dCongestionWindow = m_pCC->m_dCWndSize;
    
       // 此时正式进入连接状态
       m_bConnecting = false;    
       m_bConnected = true;
    
       //将这个UDT SOCKET置于接收队列上面,之后就可以从连接队列上接收数据了
       m_pRNode->m_bOnList = true;
       m_pRcvQueue->setNewEntry(this);
    
       //更新与这个SOCKET ID相关的事件
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
    
       // 调整UDT SOCKET实例的状态为:Connected
       s_UDTUnited.connect_complete(m_SocketID);
    
       return 0;
    }
    

    int flush(UDTSOCKET u)

    flush的处理流程:int UDT::flush(UDTSOCKET u) -> int CUDT::flush(UDTSOCKET u) -> int CUDTUnited::flush(const UDTSOCKET u) -> void CUDT::flush()
    • Order 0:int CUDTUnited::flush(const UDTSOCKET u):交由GC线程回收发送缓存区与接收缓冲区的数据,因为设置了Linger,所以需要判断是否需要一定的延迟
    int CUDTUnited::flush(const UDTSOCKET u)
    {
       CUDTSocket* s = locate(u);
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       s->m_pUDT->flush();
    
       return 0;
    }
    
    • Order 1:void CUDT::flush()
    void CUDT::flush()
    {
       uint64_t entertime = CTimer::getTime();
    
       while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
       {
          // linger has been checked by previous close() call and has expired
          if (m_ullLingerExpiration >= entertime)
             break;
    
          if (!m_bSynSending)
          {
             // if this socket enables asynchronous sending, return immediately and let GC to close it later
             if (0 == m_ullLingerExpiration)
                m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
    
             return;
          }
    
          #ifndef WINDOWS
             timespec ts;
             ts.tv_sec = 0;
             ts.tv_nsec = 1000000;
             nanosleep(&ts, NULL);
          #else
             Sleep(1);
          #endif
       }
    }
    

    int close(UDTSOCKET u)

    close的处理流程: int UDT::close(UDTSOCKET u) -> int CUDT::close(UDTSOCKET u) -> int CUDTUnited::close(const UDTSOCKET u) -> void CUDT::close()
    • Order 0:int CUDTUnited::close(const UDTSOCKET u)
    int CUDTUnited::close(const UDTSOCKET u)
    {
       CUDTSocket* s = locate(u);    //获取CUDTSocket实例
       if (NULL == s)
          throw CUDTException(5, 4, 0);
    
       CGuard socket_cg(s->m_ControlLock);
    
       if (s->m_Status == LISTENING)    //如果是Listener
       {
          if (s->m_pUDT->m_bBroken)    //如果目前CUDT实例的状态已经损害,直接退出,等待GC线程回收资源
             return 0;
    
          s->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET关闭时间
          s->m_pUDT->m_bBroken = true;    //然后还是调整CUDT的状态为损坏
    
          // broadcast all "accept" waiting
          #ifndef WINDOWS
             pthread_mutex_lock(&(s->m_AcceptLock));
             pthread_cond_broadcast(&(s->m_AcceptCond));    //唤醒所有等待accept()的线程
             pthread_mutex_unlock(&(s->m_AcceptLock));
          #else
             SetEvent(s->m_AcceptCond);
          #endif
    
          return 0;    //调整Listener之后就可以直接退出了
       }
    
       s->m_pUDT->close();    //调用连接CUDT中的close()调整CUDT的状态
    
       // synchronize with garbage collection.
       CGuard manager_cg(m_ControlLock);
    
       // since "s" is located before m_ControlLock, locate it again in case it became invalid
       map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);    //在全局的map中寻找UDTSocket实例
       if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))    //如果没有找到或者状态为已经关闭,也可以直接退出
          return 0;
       s = i->second;    //获得CUDTSocket实例
    
       s->m_Status = CLOSED;    //调整CUDTSocket的状态为CLOSED
    
       // UDT SOCKET在关闭的时候不会立刻被移除,以防止其他的回调函数访问无效地址,定时器启动,资源会在1S之后被删除
       s->m_TimeStamp = CTimer::getTime();    //调整UDTSocket关闭的事件
    
       m_Sockets.erase(s->m_SocketID);    //从全局的map中删除
       m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));    //将这个UDTSocket加入Closed队列中等待删除
    
       CTimer::triggerEvent();
    
       return 0;
    }
    
    • Order 1:void CUDT::close():调整CUDT实例的状态
    void CUDT::close()
    {
       if (!m_bOpened)    //如果CUDT已经关闭,直接返回
          return;
    
       if (0 != m_Linger.l_onoff)    //如果设置了稍后关闭,那么在稍后的时候再进行处理
       {
          flush();
       }
    
       //从接收数据的队列中移除数据
       if (m_bConnected)
          m_pSndQueue->m_pSndUList->remove(this);
    
       // 清理与这个UDTSocket实例相关的事件
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
       s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
    
       // 将这个UDT SOCKET ID从所有注册的Epoll中移除
       try
       {
          for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
             s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
       }
       catch (...)
       {
       }
    
       if (!m_bOpened)    //再次检查状态
          return;
    
       // 目前正处于关闭过程中
       m_bClosing = true;
    
       CGuard cg(m_ConnectionLock);
    
       // 如果发送者和接受者还在等待数据,发送信号通知他们退出,pthread_cond_signal
       releaseSynch();
    
       if (m_bListening)    //如果是Listener
       {
          m_bListening = false;    //调整Listener的状态
          m_pRcvQueue->removeListener(this);    //从接收队列中移除这个Listener,不再负责接收的事务处理
       }
       else if (m_bConnecting)    //如果正处于CONNETING
       {
          m_pRcvQueue->removeConnector(m_SocketID);    //从交汇连接队列中移除这个UDT SOCKET
       }
    
       if (m_bConnected)    //如果连接已经完成
       {
          if (!m_bShutdown)    //如果对方还没有发送ShutDown
             sendCtrl(5);    //name我们就向对方发送ShutDown
    
          m_pCC->close();    //关闭拥塞控制
    
          // 在Cache中存储并更新这个连接的信息
          CInfoBlock ib;
          ib.m_iIPversion = m_iIPversion;
          CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
          ib.m_iRTT = m_iRTT;
          ib.m_iBandwidth = m_iBandwidth;
          m_pCache->update(&ib);
    
          m_bConnected = false;    //调整状态为关闭
       }
    
       // 等待Send和Recv的停止
       CGuard sendguard(m_SendLock);    //如果在发送数据或者接收数据时,Buffer不够用的时候,一般会使用pthread_cond_wait睡眠一会
       CGuard recvguard(m_RecvLock);
    
       // CLOSED.
       m_bOpened = false;    //这个UDT SOCKET已经关闭
    }
    

    int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false)

    send的处理流程:int UDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
    • Order 0:int CUDT::send(const char* data, int len):处理发送数据报的情况,仅适用于UDP
    int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
    {
       if (UDT_STREAM == m_iSockType)    //如果不是UDP类型直接返回
          throw CUDTException(5, 9, 0);
    
       // 如果当前的状态不正确,要记得退出
       if (m_bBroken || m_bClosing)
          throw CUDTException(2, 1, 0);
       else if (!m_bConnected)
          throw CUDTException(2, 2, 0);
    
       if (len <= 0)    //需要发送的数据长度不正确
          return 0;
    
       if (len > m_iSndBufSize * m_iPayloadSize)    //如果需要发送的数据  > 发送缓冲×有效载荷,记得退出
          throw CUDTException(5, 12, 0);
    
       CGuard sendguard(m_SendLock);
    
       if (m_pSndBuffer->getCurrBufSize() == 0)    //如果当前的缓冲区中没有空间,稍微延迟一会
       {
          uint64_t currtime;
          CTimer::rdtsc(currtime);
          m_ullLastRspTime = currtime;    //记录延迟计时器,避免延迟严重
       }
    
       //如果需要发送的数据 > 已经使用的空间,也就是说空间还是不够用  
       if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
       {
          if (!m_bSynSending)    //如果没有设置异步发送标志,要抛出异常
             throw CUDTException(6, 1, 0);
          else
          {
             // wait here during a blocking sending
             #ifndef WINDOWS
                pthread_mutex_lock(&m_SendBlockLock);    //拿到睡眠阻塞锁
                if (m_iSndTimeOut < 0)    //如果此时还没有到超时时间
                {    //再次经过判断空间还是不够用,那就得睡眠一会喽
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))    
                      pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
                }
                else    //如果已经超时
                {
                   uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;    再给一次机会睡眠一会
                   timespec locktime;
    
                   locktime.tv_sec = exptime / 1000000;
                   locktime.tv_nsec = (exptime % 1000000) * 1000;
    
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                      pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
                }
                pthread_mutex_unlock(&m_SendBlockLock);
             #else
                if (m_iSndTimeOut < 0)
                {
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
                      WaitForSingleObject(m_SendBlockCond, INFINITE);
                }
                else
                {
                   uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
    
                   while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                      WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
                }
             #endif
    
             // check the connection status
             if (m_bBroken || m_bClosing)
                throw CUDTException(2, 1, 0);
             else if (!m_bConnected)
                throw CUDTException(2, 2, 0);
          }
       }
    
       if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)    //如果Buffer还是不够用,并且已经超时,要抛出异常
       {
          if (m_iSndTimeOut >= 0)
             throw CUDTException(6, 3, 0);
    
          return 0;    //空间不够用但是没有超时,返回0,告诉用户数据没有发送
       }
    
       // 记录这一次发送数据的时间
       if (0 == m_pSndBuffer->getCurrBufSize())    //此时已经有了可以发送数据的空间
          m_llSndDurationCounter = CTimer::getTime();
    
       // 将用户提供的数据添加到UDT SOCKET的空间中
       m_pSndBuffer->addBuffer(data, len, msttl, inorder);
    
       // 如果这个UDT SOCKET的发送链表中,将这个UDT添加到UDP的发送链表中
       m_pSndQueue->m_pSndUList->update(this, false);
    
       //如果发送缓冲区中没有足够可用的数据,取消可以写入事件标志 
       if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
       {
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
       }
    
       return len;
    }
    

    int recvmsg(UDTSOCKET u, char* buf, int len)

    recvmsg的处理流程:int UDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(char* data, int len)
    • Order 1:int CUDT::recvmsg(char* data, int len):处理UDP数据报的接收请求
    int CUDT::recvmsg(char* data, int len)
    {
       if (UDT_STREAM == m_iSockType)    //如果是TCP类型的数据报,直接退出
          throw CUDTException(5, 9, 0);
    
       //如果状态不正确,直接退出
       if (!m_bConnected)
          throw CUDTException(2, 2, 0);
    
       //如果想要获取len<=0,直接返回
       if (len <= 0)
          return 0;
    
       CGuard recvguard(m_RecvLock);
    
       if (m_bBroken || m_bClosing)    //如果目前处于关闭/损坏的状态
       {
          int res = m_pRcvBuffer->readMsg(data, len);    //依旧尝试从Buffer中读取数据
    
          if (m_pRcvBuffer->getRcvMsgNum() <= 0)    //如果UDT Queue中已经没有数据了,取消此UDT的可读取事件
          {
             // read is not available any more
             s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
          }
    
          if (0 == res)    //如果读取失败,抛出异常,否则返回读取的字节数
             throw CUDTException(2, 1, 0);
          else
             return res;
       }
    
       if (!m_bSynRecving)    //如果没有设置异步读取机制,没有读取到数据时要抛出异常,读到了就直接返回
       {
          int res = m_pRcvBuffer->readMsg(data, len);
          if (0 == res)
             throw CUDTException(6, 2, 0);
          else
             return res;
       }
    
       int res = 0;    //异步读取的话可以延迟一小会
       bool timeout = false;
    
       do
       {
          #ifndef WINDOWS
             pthread_mutex_lock(&m_RecvDataLock);
    
             if (m_iRcvTimeOut < 0)    //还没有到超时时间,那么小憩一会
             {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                   pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
             }
             else    //已经到了超时时间,但是再给你一次机会,等待一会
             {
                uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
                timespec locktime;
    
                locktime.tv_sec = exptime / 1000000;
                locktime.tv_nsec = (exptime % 1000000) * 1000;
    
                if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
                   timeout = true;
    
                res = m_pRcvBuffer->readMsg(data, len);    //等待一会之后,再次尝试读取数据
             }
             pthread_mutex_unlock(&m_RecvDataLock);
          #else
             if (m_iRcvTimeOut < 0)
             {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                   WaitForSingleObject(m_RecvDataCond, INFINITE);
             }
             else
             {
                if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
                   timeout = true;
    
                res = m_pRcvBuffer->readMsg(data, len);
             }
          #endif
    
          if (m_bBroken || m_bClosing)    //如果状态不正确,抛出异常
             throw CUDTException(2, 1, 0);
          else if (!m_bConnected)
             throw CUDTException(2, 2, 0);
       } while ((0 == res) && !timeout);    //今天读不到数据,我就赖着不走了
    
       //如果接收缓冲区中确实没有数据可读,那么必须要取消关注的可读事件 
       if (m_pRcvBuffer->getRcvMsgNum() <= 0)
       {
          // read is not available any more
          s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
       }
    
       if ((res <= 0) && (m_iRcvTimeOut >= 0))    //如果没有读到数据,还超时了,抛出异常
          throw CUDTException(6, 3, 0);
    
       return res;
    }
    
  • 相关阅读:
    双camera景深计算
    解决单反出片发灰难题 教你让照片变得通透
    增强画面纵深感的几个小技巧
    双目视觉算法简介
    Android系统源代码的下载与编译
    android 7.0 (nougat)的编译优化-ninja
    神奇的图像处理算法
    【老戴说镜头】浅谈双摄镜头技术
    [Android编程心得] Camera(OpenCV)自动对焦和触摸对焦的实现
    关于DLL模块导出函数
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191048.html
Copyright © 2011-2022 走看看