zoukankan      html  css  js  c++  java
  • UDT源码剖析(十)之Channel

    封装UDP SOCKET,使其成为UDT数据的发送通道,所有的UDT SOCKET都通过这种类型来进行数据发送。为了完整性,咱们接着分析哈~

    CChannel

    • 基础数据结构:
    class CChannel
    {
    private:
       int m_iIPversion;                    // IP version
       int m_iSockAddrSize;                 // socket address structure size (pre-defined to avoid run-time test)
    
       UDPSOCKET m_iSocket;                 // socket descriptor
    
       int m_iSndBufSize;                   // UDP sending buffer size
       int m_iRcvBufSize;                   // UDP receiving buffer size
    };
    
    • 初始化:CChannel::CChannel()
    CChannel::CChannel():
    m_iIPversion(AF_INET),
    m_iSockAddrSize(sizeof(sockaddr_in)),
    m_iSocket(),
    m_iSndBufSize(65536),    //唯一能得到的信息就是将UDP SOCKET的Buffer设置为65536Byte
    m_iRcvBufSize(65536)
    {
    }
    
    CChannel::CChannel(int version):
    m_iIPversion(version),
    m_iSocket(),
    m_iSndBufSize(65536),
    m_iRcvBufSize(65536)
    {
       m_iSockAddrSize = (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
    }
    
    • 打开通道:void CChannel::open(const sockaddr* addr)
    void CChannel::open(const sockaddr* addr)
    {
       m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);    //创建一个UDP SOCKET
    
       #ifdef WINDOWS
          if (INVALID_SOCKET == m_iSocket)
       #else
          if (m_iSocket < 0)
       #endif
          throw CUDTException(1, 0, NET_ERROR);
    
       if (NULL != addr)    //如果提供地址,将创建的UDPSOCKET与Address关联
       {
          socklen_t namelen = m_iSockAddrSize;
    
          if (0 != ::bind(m_iSocket, addr, namelen))
             throw CUDTException(1, 3, NET_ERROR);
       }
       else     //如果不提供地址,获取地址也要给UDP和Address关联在一起
       {
          //sendto or WSASendTo will also automatically bind the socket
          addrinfo hints;
          addrinfo* res;
    
          memset(&hints, 0, sizeof(struct addrinfo));
    
          hints.ai_flags = AI_PASSIVE;
          hints.ai_family = m_iIPversion;
          hints.ai_socktype = SOCK_DGRAM;
    
          if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
             throw CUDTException(1, 3, NET_ERROR);
    
          if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
             throw CUDTException(1, 3, NET_ERROR);
    
          ::freeaddrinfo(res);
       }
    
       setUDPSockOpt();    //根据初始化时提供的信息,设置这个UDP SOCKET的选项
    }
    
    • 关闭通道:void CChannel::close() const
    void CChannel::close() const
    {
       #ifndef WINDOWS
          ::close(m_iSocket);
       #else
          ::closesocket(m_iSocket);
       #endif
    }
    
    • 设置UDP选项:void CChannel::setUDPSockOpt()
    void CChannel::setUDPSockOpt()
    {
        //设置接收与发送缓冲区
       #if defined(BSD) || defined(MACOSX)
          // BSD system will fail setsockopt if the requested buffer size exceeds system maximum value
          int maxsize = 64000;
          if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&m_iRcvBufSize, sizeof(int)))
             ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&maxsize, sizeof(int));
          if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&m_iSndBufSize, sizeof(int)))
             ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&maxsize, sizeof(int));
       #else
          // for other systems, if requested is greated than maximum, the maximum value will be automactally used
          if ((0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char*)&m_iRcvBufSize, sizeof(int))) ||
              (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char*)&m_iSndBufSize, sizeof(int))))
             throw CUDTException(1, 3, NET_ERROR);
       #endif
    
       timeval tv;
       tv.tv_sec = 0;
       #if defined (BSD) || defined (MACOSX)
          // Known BSD bug as the day I wrote this code.
          // A small time out value will cause the socket to block forever.
          tv.tv_usec = 10000;
       #else
          tv.tv_usec = 100;
       #endif
    
          //顺便将UDPSOCKET设置成非阻塞模式
       #ifdef UNIX
          // Set non-blocking I/O
          // UNIX does not support SO_RCVTIMEO
          int opts = ::fcntl(m_iSocket, F_GETFL);
          if (-1 == ::fcntl(m_iSocket, F_SETFL, opts | O_NONBLOCK))
             throw CUDTException(1, 3, NET_ERROR);
       #elif WINDOWS
          DWORD ot = 1; //milliseconds
          if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&ot, sizeof(DWORD)))
             throw CUDTException(1, 3, NET_ERROR);
       #else
          // Set receiving time-out value
          if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(timeval)))
             throw CUDTException(1, 3, NET_ERROR);
       #endif
    }
    
    • 获取与设置UDP信息:int CChannel::getSndBufSize()int CChannel::getRcvBufSize()void CChannel::setSndBufSize(int size)void CChannel::setRcvBufSize(int size)void CChannel::getSockAddr(sockaddr* addr) constvoid CChannel::getPeerAddr(sockaddr* addr) const
    int CChannel::getSndBufSize()
    {
       socklen_t size = sizeof(socklen_t);
       ::getsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (char *)&m_iSndBufSize, &size);
       return m_iSndBufSize;
    }
    
    int CChannel::getRcvBufSize()
    {
       socklen_t size = sizeof(socklen_t);
       ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (char *)&m_iRcvBufSize, &size);
       return m_iRcvBufSize;
    }
    
    void CChannel::setSndBufSize(int size)
    {
       m_iSndBufSize = size;
    }
    
    void CChannel::setRcvBufSize(int size)
    {
       m_iRcvBufSize = size;
    }
    
    void CChannel::getSockAddr(sockaddr* addr) const
    {
       socklen_t namelen = m_iSockAddrSize;
       ::getsockname(m_iSocket, addr, &namelen);
    }
    
    void CChannel::getPeerAddr(sockaddr* addr) const
    {
       socklen_t namelen = m_iSockAddrSize;
       ::getpeername(m_iSocket, addr, &namelen);
    }
    
    • 发送数据包:int CChannel::sendto(const sockaddr* addr, CPacket& packet) const
    int CChannel::sendto(const sockaddr* addr, CPacket& packet) const
    {
       // 先确定是数据包还是控制包,如果是控制包,将控制信息转换为网络字节序
       if (packet.getFlag())
          for (int i = 0, n = packet.getLength() / 4; i < n; ++ i)
             *((uint32_t *)packet.m_pcData + i) = htonl(*((uint32_t *)packet.m_pcData + i));
    
    
       //  将packet的header转换为网络字节序
       uint32_t* p = packet.m_nHeader;
       for (int j = 0; j < 4; ++ j)
       {
          *p = htonl(*p);
          ++ p;
       }
    
       #ifndef WINDOWS
          msghdr mh;        //使用这种方式将数据重组,然后发送
          mh.msg_name = (sockaddr*)addr;
          mh.msg_namelen = m_iSockAddrSize;
          mh.msg_iov = (iovec*)packet.m_PacketVector;
          mh.msg_iovlen = 2;
          mh.msg_control = NULL;
          mh.msg_controllen = 0;
          mh.msg_flags = 0;
    
          int res = ::sendmsg(m_iSocket, &mh, 0);
       #else
          DWORD size = CPacket::m_iPktHdrSize + packet.getLength();
          int addrsize = m_iSockAddrSize;
          int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr, addrsize, NULL, NULL);
          res = (0 == res) ? size : -1;
       #endif
    
       //在发送结束之后,将头部的信息转化为local字节序   
       p = packet.m_nHeader;
       for (int k = 0; k < 4; ++ k)
       {
          *p = ntohl(*p);
           ++ p;
       }
    
       //如果是控制包,将额外的控制信息的打data转化为loacl字节序
       if (packet.getFlag())
       {
          for (int l = 0, n = packet.getLength() / 4; l < n; ++ l)
             *((uint32_t *)packet.m_pcData + l) = ntohl(*((uint32_t *)packet.m_pcData + l));
       }
    
       return res;
    }
    
    • 接收数据包:int CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
    int CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
    {
       #ifndef WINDOWS
          msghdr mh;
          mh.msg_name = addr;
          mh.msg_namelen = m_iSockAddrSize;
          mh.msg_iov = packet.m_PacketVector;
          mh.msg_iovlen = 2;
          mh.msg_control = NULL;
          mh.msg_controllen = 0;
          mh.msg_flags = 0;
    
          #ifdef UNIX
             fd_set set;
             timeval tv;
             FD_ZERO(&set);
             FD_SET(m_iSocket, &set);
             tv.tv_sec = 0;
             tv.tv_usec = 10000;
             ::select(m_iSocket+1, &set, NULL, &set, &tv);
          #endif
    
          int res = ::recvmsg(m_iSocket, &mh, 0);
       #else
          DWORD size = CPacket::m_iPktHdrSize + packet.getLength();
          DWORD flag = 0;
          int addrsize = m_iSockAddrSize;
    
          int res = ::WSARecvFrom(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, &flag, addr, &addrsize, NULL, NULL);
          res = (0 == res) ? size : -1;
       #endif
    
          //如果报文的读取错误,将报文的长度设置为-1
       if (res <= 0)
       {
          packet.setLength(-1);
          return -1;
       }
    
       //否则,将报文的长度设置为减去头部长度(-16)
       packet.setLength(res - CPacket::m_iPktHdrSize);
    
       // convert back into local host order
       //for (int i = 0; i < 4; ++ i)
       //   packet.m_nHeader[i] = ntohl(packet.m_nHeader[i]);
       uint32_t* p = packet.m_nHeader;
       for (int i = 0; i < 4; ++ i) //转化报文的头部的信息为loacl字节序
       {
          *p = ntohl(*p);
          ++ p;
       }
    
       //如果是控制包,将控制信息转化为local字节序
       if (packet.getFlag())
       {
          for (int j = 0, n = packet.getLength() / 4; j < n; ++ j)
             *((uint32_t *)packet.m_pcData + j) = ntohl(*((uint32_t *)packet.m_pcData + j));
       }
    
       return packet.getLength();
    }
    
  • 相关阅读:
    深入理解Azure自动扩展集VMSS(1)
    使用ARM和VMSS创建自动扩展的web集群
    使用ARM模板部署自动扩展的Linux VMSS(2)
    使用ARM模板部署自动扩展的Linux VMSS(1)
    Azure上A/D系列虚拟机到DS系列迁移(2)
    ORM进阶操作
    Django之中间件
    restful十项规范
    同源策略与跨域请求
    Django之CSRF问题
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191067.html
Copyright © 2011-2022 走看看