zoukankan      html  css  js  c++  java
  • diy数据库(二)--网络通信类

    一、首先,我们先实现OSS层的ossSocket类。供数据库client和数据库引擎进行通信

    友情提示:相应上面的类图的头文件和源码附在了本文的最以下。

    int _fd ;//socket的文件描写叙述符

    socklen_t _addressLen ;//地址长度

    socklen_t _peerAddressLen ;//对方地址的长度

    struct sockaddr_in _sockAddress ;//本地的socket地址 ipv4

    struct sockaddr_in _peerAddress ;//对方的socket地址 ipv4

    bool _init ;//代表是否已经初始化

    int _timeout ;//代表超时时间,包含读和写的超时


    这是一个基础类,都是对socket的通用的操作,实现方法也比較常见。

    这里主要注意的有几点:

    1.读写sockt时要注意信号中断带来的错误ENTRY,对于中断。我们要手动处理。

    2.因为数据库需的操作须要有实时性。我们须要设置TCP_NODELAY套接字选项,来关闭TCP的Nagle算法

    3.我们的ossSocket类中的读写函数有一个特点就是添加了时间限制。即我们要求在给定时间參数的范围类得带可读或可写的socket,不然推断读写超时

    4.为了防止socket进入timewait状态,我们须要设置SO_REUSEADDR套接字选项

    5.当向一个已经收到RST的套接字运行写操作时,内核会向用户进程发送一个SIGPIPE信号。

    该信号的默认行为是终止进程。

    send()函数的最后一个參数能够设MSG_NOSIGNAL,来禁止内核发送SIGPIPE信号。



    附录:

    下面是头文件:ossSocket.hpp

    #ifndef OSSNETWORK_HPP_
    #define OSSNETWORK_HPP_
    
    #include "core.hpp"
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <netdb.h>
    #include <netinet/tcp.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #define SOCKET_GETLASTERROR errno//系统的错误码
    #define OSS_EAGAIN	EAGAIN
    #define OSS_EINTR	EINTR
    #define closesocket	close
    
    // by default 10ms timeout
    #define OSS_SOCKET_DFT_TIMEOUT 10000//默认10毫秒的超时
    
    // max hostname
    #define OSS_MAX_HOSTNAME NI_MAXHOST//hostname的
    #define OSS_MAX_SERVICENAME NI_MAXSERV
    
    
    #include "core.hpp"
    
    
    typedef int		SOCKET;
    
    class _ossSocket
    {
    private ://全部
       int _fd ;//socket的文件描写叙述符
       socklen_t _addressLen ;//地址长度
       socklen_t _peerAddressLen ;//对方地址的长度
       struct sockaddr_in  _sockAddress ;//本地的socket地址 ipv4
       struct sockaddr_in  _peerAddress ;//对方的socket地址 ipv4
    
       bool _init ;//代表是否已经初始化
       int _timeout ;//代表超时时间,包含读和写的超时
    protected:
       unsigned int _getPort ( sockaddr_in *addr ) ;//得到端口
       int _getAddress ( sockaddr_in *addr, char *pAddress, unsigned int length ) ;//得到地址
    
    public :
       int setSocketLi ( int lOnOff, int linger ) ;//设置close连接时对发送缓冲区中数据的操作
       void setAddress(const char * pHostName, unsigned int port );
       // Create a listening socket
       _ossSocket();//设置默认的socket的属性
       _ossSocket ( unsigned int port, int timeout = 0 ) ;//设置socket的属性
       _ossSocket ( const char *pHostname, unsigned int port, int timeout = 0 ) ;<span style="font-family: Arial, Helvetica, sans-serif;">// 创建一个clientsocket</span>
       _ossSocket ( int *sock, int timeout = 0 ) ;<span style="font-family: Arial, Helvetica, sans-serif;">//通过accept的到的socket的fd来构建</span>
    
       ~_ossSocket ()
       {
          close () ;
       }
       int initSocket () ;//初始化函数
       int bind_listen () ;//将socket绑定到一个端口
       bool isConnected () ;//推断连接状态
       int send ( const char *pMsg, int len,
                    int timeout = OSS_SOCKET_DFT_TIMEOUT,
                    int flags = 0 ) ;
       int recv ( char *pMsg, int len,
                    int timeout = OSS_SOCKET_DFT_TIMEOUT,
                    int flags = 0 ) ;//收到固定长度的数据才返回(timeout=0表示永远不会超时)
       int recvNF ( char *pMsg, int &len,
                      int timeout = OSS_SOCKET_DFT_TIMEOUT ) ;//不用收到固定长度的数据
    
       int connect () ;
       void close () ;
       int accept ( int *sock, struct sockaddr *addr, socklen_t *addrlen,
                      int timeout = OSS_SOCKET_DFT_TIMEOUT ) ;
       //一下是几个辅助函数
       /*当我们发送小包的时候。tcp会把几个小包组合成一个大包来发送。这个功能能够关闭*/
       int disableNagle () ;//关闭打包发送特性,关闭tcp中的Nagle功能,小包的发送会更实时
    
       unsigned int getPeerPort () ;
       int getPeerAddress ( char *pAddress, unsigned int length ) ;
    
       unsigned int getLocalPort () ;
       int getLocalAddress ( char *pAddress, unsigned int length ) ;
    
       int setTimeout ( int seconds ) ;//设置超时
    
       static int getHostName ( char *pName, int nameLen ) ;//得到域名
       static int getPort ( const char *pServiceName, unsigned short &port ) ;//把服务名转化为端口号。全部的服务都在/etc/server文件中面
       int setAnsyn () ;
    };
    
    typedef class _ossSocket ossSocket ;
    
    #endif


    下面是实现ossSockt.cpp


    #include "ossSocket.hpp"
    #include "pd.hpp"
    
    //创建一个监听的socket
    _ossSocket::_ossSocket ( unsigned int port, int timeout )
    {//设置监听套接字的各种属性
       _init = false ;
       _fd = 0 ;
       _timeout = timeout ;
       memset ( &_sockAddress, 0, sizeof(sockaddr_in) ) ;
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       _addressLen = sizeof( _sockAddress ) ;
       //监听套接字特有
       _sockAddress.sin_family = AF_INET ;
       _sockAddress.sin_addr.s_addr = htonl ( INADDR_ANY ) ;
       _sockAddress.sin_port = htons ( port ) ;
       
    }
    
    // Create a socket
    _ossSocket::_ossSocket ()
    {
       _init = false ;
       _fd = 0 ;
       _timeout = 0 ;
       memset ( &_sockAddress, 0, sizeof(sockaddr_in) ) ;
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       _addressLen = sizeof( _sockAddress ) ;
    }
    
    // 创建一个client的套接字
    //pHostname是对方机器的机器名(假设不是机器名就当做对方机器的ip地址)。port是对方机器的端口号
    _ossSocket::_ossSocket ( const char *pHostname, unsigned int port, int timeout )
    {
       struct hostent *hp ;
       _init = false ;
       _timeout = timeout ;
       _fd = 0 ;
       memset ( &_sockAddress, 0, sizeof(sockaddr_in) ) ;
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       _sockAddress.sin_family = AF_INET ;
       if ( (hp = gethostbyname ( pHostname ) ) )//
          _sockAddress.sin_addr.s_addr = *((int*)hp->h_addr_list[0] ) ;
       else
          _sockAddress.sin_addr.s_addr = inet_addr ( pHostname ) ;
       _sockAddress.sin_port = htons ( port ) ;
       _addressLen = sizeof(_sockAddress ) ;
    }
    
    // 由已经得到的套接字来创建socket
    _ossSocket::_ossSocket ( int *sock, int timeout )//SOCKET就是int类型
    {
       int rc = DIY_OK ;
       _fd = *sock ;
       _init = true ;
       _timeout = timeout ;
       _addressLen = sizeof( _sockAddress ) ;
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       rc = getsockname ( _fd, (sockaddr*)&_sockAddress, &_addressLen ) ;//得到本地的ip地址
       if ( rc )
       {
          //PD_LOG ( PDERROR, "Failed to get sock name, error = %d",
            //       SOCKET_GETLASTERROR ) ;
          _init = false ;
       }
       else
       {
          rc = getpeername ( _fd, (sockaddr*)&_peerAddress, &_peerAddressLen ) ;//得到远端的ip地址
          //PD_RC_CHECK ( rc, PDERROR, "Failed to get peer name, error = %d",
                        //SOCKET_GETLASTERROR ) ;
       }
    done :
       return ;
    error :
       goto done ;
    }
    
    int _ossSocket::initSocket ()
    {
       int rc = DIY_OK ;
       if ( _init )
       {
          goto done ;
       }
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       _fd =socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ) ;//创建socket
       if ( -1 == _fd )
       {
          //PD_RC_CHECK ( DIY_NETWORK, PDERROR,
           //             "Failed to initialize socket, error = %d",
             //           SOCKET_GETLASTERROR ) ;
             goto error;
       }
       _init =true ;
       // set timeout
       setTimeout ( _timeout ) ;
    done :
       return rc ;
    error :
       goto done ;
    }
    
    
    //设置SO_LINGER选项
    int _ossSocket::setSocketLi ( int lOnOff, int linger )
    {
       int rc = DIY_OK ;
       struct linger _linger ;
       _linger.l_onoff = lOnOff ;
       _linger.l_linger = linger ;
       rc = setsockopt ( _fd, SOL_SOCKET, SO_LINGER,
                         (const char*)&_linger, sizeof(_linger) ) ;
       return rc ;
    }
    
    void _ossSocket::setAddress(const char *pHostname, unsigned int port )
    {
       struct hostent *hp ;
       memset ( &_sockAddress, 0, sizeof(sockaddr_in ) ) ;
       memset ( &_peerAddress, 0, sizeof(sockaddr_in) ) ;
       _peerAddressLen = sizeof(_peerAddress) ;
       _sockAddress.sin_family = AF_INET ;
       if ( ( hp = gethostbyname ( pHostname )))
          _sockAddress.sin_addr.s_addr = *((int*)hp->h_addr_list[0] ) ;
       else
          _sockAddress.sin_addr.s_addr = inet_addr ( pHostname ) ;
    
       _sockAddress.sin_port = htons ( port ) ;
       _addressLen = sizeof(_sockAddress ) ;
    }
    
    
    //监听端口
    int _ossSocket::bind_listen ()
    {
       int rc = DIY_OK ;
       int temp = 1 ;
       rc = setsockopt ( _fd, SOL_SOCKET,
                         SO_REUSEADDR, (char*)&temp, sizeof(int) ) ;//SO_REUSEADDR确定这个端口能够反复使用
       if ( rc )
       {
          //PD_LOG ( PDWARNING, "Failed to setsockopt SO_REUSEADDR, rc = %d",
          //         SOCKET_GETLASTERROR ) ;
          goto error;
       }
       rc = setSocketLi ( 1, 30 ) ;
       if ( rc )
       {
          //PD_LOG ( PDWARNING, "Failed to setsockopt SO_LINGER, rc = %d",
           //        SOCKET_GETLASTERROR ) ;
           goto error;
       }
       rc = ::bind ( _fd, (struct sockaddr*)&_sockAddress, _addressLen ) ;
       if ( rc )
       {
          //PD_RC_CHECK( DIY_NETWORK, PDERROR,
           //            "Failed to bind socket, rc = %d", SOCKET_GETLASTERROR ) ;
           goto error;
       }
       rc = listen ( _fd, SOMAXCONN ) ;
       if ( rc )
       {
          //PD_RC_CHECK( DIY_NETWORK, PDERROR,
           //            "Failed to listen socket, rc = %d", SOCKET_GETLASTERROR ) ;
           goto error;
       }
    done :
       return rc ;
    error :
       close () ;
       goto done ;
    }
    
    
    
    //发送函数
    //发送函数也有超时timeout
    int _ossSocket::send ( const char *pMsg, int len,
                          int timeout, int flags )
    {
       int rc = DIY_OK ;
       int	maxFD = _fd ;
       struct timeval maxSelectTime ;
       fd_set fds ;
    
       maxSelectTime.tv_sec = timeout / 1000000 ;
       maxSelectTime.tv_usec = timeout % 1000000 ;
       // if len == 0, then let's just return
       if ( 0 == len )
       {
          return DIY_OK ;
       }
       //监測在时间范围类socket是否可写
       while ( true )
       {
          FD_ZERO ( &fds ) ;
          FD_SET ( _fd, &fds ) ;
          rc = select ( (int)(maxFD + 1), NULL, &fds, NULL,
                        timeout>=0?&maxSelectTime:NULL ) ;
          if ( 0 == rc )//rc=0表示超时
          {
             // timeout
             rc = DIY_TIMEOUT ;//超时
             goto done ;
          }
          // if < 0, something wrong
          if ( 0 > rc )//rc<0表示有错误
          {
             rc = error ;
             if ( EINTR == rc )//假设失败是由于收到中断,继续
             {
                continue ;
             }
             //PD_RC_CHECK ( DIY_NETWORK, PDERROR,
             //              "Failed to select from socket, rc = %d", rc ) ;
          }
          if ( FD_ISSET ( _fd, &fds ) )
          {
             break ;
          }
       }
     
       while ( len > 0 )
       {
          rc = ::send ( _fd, pMsg, len, MSG_NOSIGNAL | flags ) ;//禁止send()函数向系统发送异常消息,不收到sigpipe
          if ( rc <= 0 )
    	  {
    		if(rc<0&&errno==EINTR)
    			rc=0;//假设失败是由于中断。继续
    		else
            {
             //PD_RC_CHECK ( DIY_NETWORK, PDERROR,
              //             "Failed to send, rc = %d", SOCKET_GETLASTERROR ) ;
              rc=-1;
    		  goto error;
            }
    	  }
          len -= rc ;
          pMsg += rc ;
       }
       rc = DIY_OK ;
    done :
       return rc ;
    error :
       goto done ;
    }
    
    bool _ossSocket::isConnected ()
    {
       int rc = DIY_OK ;
       rc = ::send ( _fd, "", 0, MSG_NOSIGNAL ) ;//尝试发送一个零字节的信息
       if ( 0 > rc )
          return false ;
       return true ;
    }
    
    int _ossSocket::recv ( char *pMsg, int len,
                          int timeout, int flags )
    {
       int rc = DIY_OK ;
       int retries = 0 ;
       int maxFD = (int)_fd ;
       struct timeval maxSelectTime ;
       fd_set fds ;
    
       if ( 0 ==len )
          return DIY_OK ;
       maxSelectTime.tv_sec = timeout / 1000000 ;
       maxSelectTime.tv_usec = timeout % 1000000 ;
       while ( true )
       {
          FD_ZERO ( &fds ) ;
          FD_SET ( _fd, &fds ) ;
          rc = select ( maxFD+1, &fds, NULL, NULL,
                        timeout>=0?

    &maxSelectTime:NULL ) ; // 0 means timeout if ( 0 == rc ) { rc = DIY_TIMEOUT ; goto done ; } // if < 0 something wrong if ( 0 > rc ) { rc = error ; if ( EINTR==rc ) { continue ; } //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to select from socket, rc = %d", rc ) ; rc=DIY_NETWORK; goto error; } if ( FD_ISSET ( _fd, &fds ) ) { break ; } } while ( len > 0 ) { rc = ::recv ( _fd, pMsg, len, MSG_NOSIGNAL|flags ) ; if ( rc > 0 ) { if ( flags & MSG_PEEK ) { goto done ; } len -= rc ; pMsg += rc ; } else if ( rc == 0 )//网络关闭。收到fin { //PD_RC_CHECK ( DIY_NETWORK_CLOSE, PDWARNING, // "Peer unexpected shutdown" ) ; goto error; } else { rc = error ; if ( EINTR == rc ) { continue ; } //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Recv() Failed: rc = %d", rc ) ; goto error; } } rc = DIY_OK ; done : return rc ; error : goto done ; } int _ossSocket::recvNF ( char *pMsg, int &len, int timeout ) { int rc = DIY_OK ; int retries = 0 ; int maxFD = (int)_fd ; struct timeval maxSelectTime ; fd_set fds ; // if we don't expect to receive anything, no need to continue if ( 0 == len ) return DIY_OK ; maxSelectTime.tv_sec = timeout / 1000000 ; maxSelectTime.tv_usec = timeout % 1000000 ; // wait loop until either we timeout or get a message while ( true ) { FD_ZERO ( &fds ) ; FD_SET ( _fd, &fds ) ; rc = select ( maxFD + 1, &fds, NULL, NULL, timeout>=0?

    &maxSelectTime:NULL ) ; // 0 means timeout if ( 0 == rc ) { rc = DIY_TIMEOUT ; goto done ; } // if < 0, means something wrong if ( 0 > rc ) { rc = error ; // if we failed due to interrupt, let's continue if ( EINTR == rc ) { continue ; } //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to select from socket, rc = %d", rc ) ; goto error; } if ( FD_ISSET ( _fd, &fds ) ) { break ; } } rc = ::recv ( _fd, pMsg, len, MSG_NOSIGNAL ) ; if ( rc > 0 ) { len = rc ; } else if ( rc == 0 ) { //PD_RC_CHECK ( DIY_NETWORK_CLOSE, PDWARNING, // "Peer unexpected shutdown" ) ; goto error; } else { rc = error ; goto error; } // Everything is fine when get here rc = DIY_OK ; done : return rc ; error : goto done ; } int _ossSocket::connect () { int rc = DIY_OK ; rc = ::connect ( _fd, (struct sockaddr *)&_sockAddress, _addressLen ) ; if ( rc ) { //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to connect, rc = %d", SOCKET_GETLASTERROR ) ; goto error; } rc = getsockname ( _fd, (sockaddr*)&_sockAddress, &_addressLen ) ; if ( rc ) { //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to get local address, rc = %d", rc ) ; goto error; } // get peer address rc = getpeername ( _fd, (sockaddr*)&_peerAddress, &_peerAddressLen ) ; if ( rc ) { //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to get peer address, rc = %d", rc ) ; goto error; } done : return rc ; error : goto done ; } void _ossSocket::close () { if ( _init ) { int i = 0 ; //i = ::closesocket ( _fd ) ; i = ::close ( _fd ) ; _init = false ; } } int _ossSocket::accept ( int *sock, struct sockaddr *addr, socklen_t *addrlen, int timeout ) { int rc = DIY_OK ; int maxFD = (int)_fd ; struct timeval maxSelectTime ; fd_set fds ; maxSelectTime.tv_sec = timeout / 1000000 ; maxSelectTime.tv_usec = timeout % 1000000 ; while ( true ) { FD_ZERO ( &fds ) ; FD_SET ( _fd, &fds ) ; rc = select ( maxFD + 1, &fds, NULL, NULL, timeout>=0?&maxSelectTime:NULL ) ; // 0 means timeout if ( 0 == rc ) { *sock = 0 ; rc = DIY_TIMEOUT ; goto done ; } // if < 0, means something wrong if ( 0 > rc ) { rc = SOCKET_GETLASTERROR ; // if we failed due to interrupt, let's continue if ( EINTR == rc ) { continue ; } //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to select from socket, rc = %d", // SOCKET_GETLASTERROR); goto error; } // if the socket we interested is not receiving anything, let's continue if ( FD_ISSET ( _fd, &fds ) ) { break ; } } rc = DIY_OK ; *sock = ::accept ( _fd, addr, addrlen ); if ( -1 == *sock ) { //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to accept socket, rc = %d", // SOCKET_GETLASTERROR ) ; goto error; } done : return rc ; error : close () ; goto done ; } int _ossSocket::disableNagle () { int rc = DIY_OK ; int temp = 1 ; rc = setsockopt ( _fd, IPPROTO_TCP, TCP_NODELAY, (char *) &temp, sizeof ( int ) ) ; if ( rc ) { //PD_LOG ( PDWARNING, "Failed to setsockopt, rc = %d", SOCKET_GETLASTERROR ) ; return rc; } rc = setsockopt ( _fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &temp, sizeof ( int ) ) ; if ( rc ) { //PD_LOG ( PDWARNING, "Failed to setsockopt, rc = %d", SOCKET_GETLASTERROR ) ; return rc; } return rc ; } unsigned int _ossSocket::_getPort ( sockaddr_in *addr ) { return ntohs ( addr->sin_port ) ; } int _ossSocket::_getAddress ( sockaddr_in *addr, char *pAddress, unsigned int length ) { int rc = DIY_OK ; length = length < NI_MAXHOST ?

    length : NI_MAXHOST ; rc = getnameinfo ( (struct sockaddr *)addr, sizeof(sockaddr), pAddress, length, NULL, 0, NI_NUMERICHOST ) ; if ( rc ) { //PD_RC_CHECK ( DIY_NETWORK, PDERROR, // "Failed to getnameinfo, rc = %d", SOCKET_GETLASTERROR ) ; goto error; } done : return rc ; error : goto done ; } int _ossSocket::setAnsyn () //异步非堵塞 { return fcntl(_fd, F_SETFL, O_NONBLOCK | fcntl(_fd, F_GETFL, 0)); } unsigned int _ossSocket::getLocalPort () { return _getPort ( &_sockAddress ) ; } unsigned int _ossSocket::getPeerPort () { return _getPort ( &_peerAddress ) ; } int _ossSocket::getLocalAddress ( char * pAddress, unsigned int length ) { return _getAddress ( &_sockAddress, pAddress, length ) ; } int _ossSocket::getPeerAddress ( char * pAddress, unsigned int length ) { return _getAddress ( &_peerAddress, pAddress, length ) ; } int _ossSocket::setTimeout ( int seconds ) { int rc = DIY_OK ; struct timeval tv ; tv.tv_sec = seconds ; tv.tv_usec = 0 ; // windows take milliseconds as parameter // but linux takes timeval as input rc = setsockopt ( _fd, SOL_SOCKET, SO_RCVTIMEO, ( char* ) &tv, sizeof ( tv ) ) ; if ( rc ) { //PD_LOG ( PDWARNING, "Failed to setsockopt, rc = %d", // SOCKET_GETLASTERROR ) ; return rc; } rc = setsockopt ( _fd, SOL_SOCKET, SO_SNDTIMEO, ( char* ) &tv, sizeof ( tv ) ) ; if ( rc ) { //PD_LOG ( PDWARNING, "Failed to setsockopt, rc = %d", // SOCKET_GETLASTERROR ) ; return rc; } return rc ; } int _ossSocket::getHostName ( char *pName, int nameLen ) { return gethostname ( pName, nameLen ) ; } int _ossSocket::getPort ( const char *pServiceName, unsigned short &port ) { int rc = DIY_OK ; struct servent *servinfo ; servinfo = getservbyname ( pServiceName, "tcp" ) ; if ( !servinfo ) port = atoi ( pServiceName ) ; else port = (unsigned short)ntohs(servinfo->s_port) ; return rc ; }




  • 相关阅读:
    python自动化测试学习路线-python设计语言sys模块argv参数用法
    python自动化测试学习路线-python设计语言serial模块调用方法
    【考研复习】线性代数矩阵部分-题解
    【考研复习
    Windows提权
    Hash算法——加密解密说明
    AES 加密算法的原理详解
    sqlmap常用命令
    curl的使用
    DOM XSS详解
  • 原文地址:https://www.cnblogs.com/jzssuanfa/p/7224475.html
Copyright © 2011-2022 走看看