zoukankan      html  css  js  c++  java
  • 一个ACE 架构的 Socket Client

    .h

    /**************************************************************
     *  Filename:    TcpClient.h
     *  Copyright:   Shanghai X Co., Ltd.
     *
     *  Description: TcpClient头文件.
     *
     *  @author:     w
     *  @version     10/28/2016  @Reviser  Initial Version
     **************************************************************/
    
    #ifndef _TCPCLIENT_
    #define _TCPCLIENT_
    
    #include <string>
    #include <ace/Svc_Handler.h>
    #include <ace/Connector.h>
    #include <ace/SOCK_Connector.h>
    #include <ace/Task.h>
     
    using namespace std; 
    
    //连接状态改变时回调
    typedef void (__stdcall *pfnConnectChangeCallBack)(bool);
    //接收到数据时回调
    typedef void (__stdcall *pfnReceiveCallBack)(char*, const int); 
    
    class CTcpClient : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
    {
    public:
        // 是否退出的标识
        long    m_lStop;                    
    
    public:
        // 是否允许重连 
        bool    m_nReconnect;                
        // 通信超时ms 
        int        m_nCommunicateTimeOut;        
    
    public:    
        //typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> Base;    
        CTcpClient();
        virtual ~CTcpClient();
        
    public:
        /**
         *  设置连接参数.
         *         
         *  @param        -[in]  char* szHost: [主链接地址]; 
         *  @param        -[in]  char* szBackup: [备连接地址]; 
         *  @param        -[in]  int nRemotePort: [目标端口号]; 
         *  @param        -[in]  int nLocalPort: [本地端口号]; 
         *  @return        int.  
         *  @version    10/28/2016    w   Initial Version 
         */
        virtual long SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort = 0);
        /**
         *  首次连接. 
         *
         *  @return        int. 
         *  @version    10/28/2016    w   Initial Version 
         */
        virtual int Connect();
        
        /**
        *  断开连接. 
        *           
        *  @return        int. 
        *  @version        10/28/2016    w   Initial Version 
        */
        virtual int Reconnect();
        /**
        *  断开连接. 
        *
        *  @return        int.  
        *  @version        10/28/2016    w   Initial Version 
        */
        virtual int Disconnect();
    
        /**
         *  发送数据.
         *
         *  @param        -[in,out]  char* szSend: [数据]
         *  @param        -[in]  char* lSendSize: [大小]
         *  @param        -[in]  int nCommunicateTimeOut: [超时ms] 
         *  @return        int.  
         *  @version    10/28/2016    w   Initial Version 
         */
        virtual long Send(const char* szSend, long lSendSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT);
    
        /**
         *  接收数据.
         *
         *  @param        -[in,out]  char* szReceive: [数据]
         *  @param        -[in]  long lReceiveSize: [大小]
         *  @param        -[in]  int nCommunicateTimeOut: [超时ms]  
         *  @version    10/28/2016    w   Initial Version 
         */
        virtual long Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT);
        //
        virtual bool IsConnected() { return m_nIsConnected; } 
        /**
        *  设置连接改变回调函数.  
        *
        *  @version    10/28/2016    w   Initial Version 
        */
        void SetOnConnectChangeCallBack(pfnConnectChangeCallBack func);
        /**
        *  设置数据接收回调函数.  
        *
        *  @version    10/28/2016    w   Initial Version 
        */
        void SetOnReceiveCallBack(pfnReceiveCallBack func);
    
    public:      
        /**
         *  建立连接时被调用.
         *
         *  @param        -[in,out]  char* param: [参数] 
         *  @return        int.  
         *  @version    10/28/2016    w   Initial Version 
         */
        int open(void* param = 0);    
        
        /**
         *  当有输入时该函数被调用.
         *
         *  @param        -[in]  ACE_HANDLE: [参数] 
         *  @return        int.  
         *  @version    10/28/2016    w   Initial Version 
         */
        int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);     
        
        /**
         *  当有输出时该函数被调用.
         *
         *  @param        -[in]  ACE_HANDLE handle: [参数] 
         *  @return        int.  
         *  @version    10/28/2016    w   Initial Version 
         */
        virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE);    
        
        /**
         *  当SockHandler从ACE_Reactor中移除时该函数被调用.
         *
         *  @param        -[in]  ACE_HANDLE handle: [参数]
         *  @param        -[in]  ACE_HANDLE closeMask: [参数] 
         *  @return        int 
         *  @version    10/28/2016    w   Initial Version 
         */
        int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask);    
        
        /**
         *  任务的主流程.
         *        1.激活事件
         *
         *  @return        int. 
         *  @version    10/10/2016    w   Initial Version
         */
        int    svc();
    
    
    protected: 
        /**
         *  触发连接改变回调函数.
         *         
         *  @param        -[in]  bool nIsConnected: [是否已连接]
         *
         *  @version    10/28/2016    w   Initial Version 
         */
        void OnConnectChange(bool nIsConnected);
    
        /**
         *  触发数据接收回调函数.
         *         
         *  @param        -[in,out]  char* pszReceive: [接收的数据区]
         *  @param        -[in]  const int nReceiveSize: [数据大小] 
         *  @version    10/28/2016    w   Initial Version 
         */
        void OnReceive(char* pszReceive, const int nReceiveSize); 
    
    protected:  
        // 是否已经连接
        bool    m_nIsConnected;                
        // 当前连接IP地址
        string    m_strConnectIPAddress;        
        // 主线连接IP地址
        string    m_strHostIPAddress;        
        // 备线连接IP地址
        string    m_strBackupIPAddress;        
        // 远程连接端口号
        unsigned short m_nRemotePort;        
        // 本地连接端口号
        unsigned short m_nLocalPort;        
         
        pfnConnectChangeCallBack m_pfnOnConnectChange;
        pfnReceiveCallBack m_pfnOnReceive; 
        
    protected:
        // 最后一次连接时间
        time_t    m_tmLastConnect;            
    
    private:
        /**
        *  关闭Socket. 
        *
        *  @return        int.  
        *  @version        10/28/2016    w   Initial Version 
        */
        int CloseSocket();                     
    
    };
    //typedef ACE_Connector<CTcpClient, ACE_SOCK_CONNECTOR>  CONNECTOR;
    
    #endif // !_TCPCLIENT_

    .cpp

    /**************************************************************
     *  Filename:    TcpClient.cpp
     *  Copyright:   Shanghai X Co., Ltd.
     *
     *  Description: TcpClient源文件.
     *
     *  @author:     w
     *  @version     10/28/2016  @Reviser  Initial Version
     **************************************************************/
     
    #include "TcpClient.h" 
    #include <iostream>
    #include <string>
     
    #include <ace/ACE.h>
    #include <ace/OS_NS_sys_socket.h>
    #include <ace/OS_NS_strings.h>
    
    using namespace std;
    
    //ctor
    CTcpClient::CTcpClient()
    { 
        m_lStop = true; 
        m_nReconnect = true;
        m_nIsConnected = false; 
    
        m_pfnOnConnectChange = NULL;
        m_pfnOnReceive = NULL;
    
        m_strConnectIPAddress = "";    
        m_strHostIPAddress = "";        
        m_strBackupIPAddress = "";    
        m_nRemotePort = 0;    
        m_nLocalPort = 0;         
    
        m_tmLastConnect = 0;     
    }
    //dctor
    CTcpClient::~CTcpClient()
    {
        m_lStop = true; 
        wait();
        //
        close();
        m_pfnOnConnectChange = NULL;
        m_pfnOnReceive = NULL;
    }  
    
    
    long CTcpClient::SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort)
    { 
        m_strConnectIPAddress =    m_strHostIPAddress = szHost; 
        m_strBackupIPAddress = szBackup; 
        m_nRemotePort = nRemotePort; 
        m_nLocalPort = nLocalPort;
        return 0;
    }
    
    void CTcpClient::SetOnConnectChangeCallBack(pfnConnectChangeCallBack func)
     {
         this->m_pfnOnConnectChange = func; 
     }
    
    void CTcpClient::SetOnReceiveCallBack(pfnReceiveCallBack func)
     {
         this->m_pfnOnReceive = func; 
     }
    
    void CTcpClient::OnConnectChange(bool nIsConnected)
    {  
        m_nIsConnected = nIsConnected; 
        if(!m_nIsConnected)
            Log(LOGLEVEL_ERROR, "Disconnect from(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort);
        else
            Log(LOGLEVEL_NOTICE, "Connected to(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort);    
        //
        if(m_pfnOnConnectChange)
            m_pfnOnConnectChange(m_nIsConnected);
    }
    
    void CTcpClient::OnReceive(char* pszReceive, const int nReceiveSize)
    {        
        ACE_Message_Block *pFrame = new ACE_Message_Block(nReceiveSize);
        memcpy(pFrame->wr_ptr(), pszReceive, nReceiveSize);
        pFrame->wr_ptr(nReceiveSize); 
        this->putq(pFrame);
        /*if(m_pfnOnReceive)
            m_pfnOnReceive(pszReceive, nReceiveSize);*/
        delete[] pszReceive;
    }
    
    int CTcpClient::Disconnect()
    {
        CloseSocket();
        OnConnectChange(false);
        return 0;
    }
    
    int CTcpClient::CloseSocket()
    {
        //ACE_OS::shutdown(get_handle(), ACE_SHUTDOWN_BOTH);
        //int nRet = ACE_OS::closesocket(m_sockHandler.get_handle()); 
        this->peer().close();
        set_handle(ACE_INVALID_HANDLE);
        return 0;
    }
    
    int CTcpClient::Reconnect()
    {    
        //已连接
        if (IsConnected())
            return 0; 
        //未设置重连机制
        if(!m_nReconnect)
        {
            Log(LOGLEVEL_INFO, "Reconnect is disabled.");
            return -1;
        }        
        //小于超时时间3s不能重连
        time_t tmNow;
        time(&tmNow); 
        if(abs(tmNow - m_tmLastConnect) <= CONNECTION_TIMEOUT)
            return -1;
        //清理Socket
        CloseSocket();
        return Connect();
    }
     
    int CTcpClient::Connect()
    { 
        //与服务器建立连接 
        CTcpClient *pSockHandler = this;
        //创建连接器
        ACE_Connector<CTcpClient, ACE_SOCK_CONNECTOR> connector;
        //设置默认连接超时
        ACE_Time_Value connTimeOut(CONNECTION_TIMEOUT);
        ACE_Synch_Options synch_option(ACE_Synch_Options::USE_TIMEOUT, connTimeOut);    
        //远程端点
        ACE_INET_Addr remoteEP(m_nRemotePort, m_strConnectIPAddress.c_str()); 
        Log(LOGLEVEL_INFO, "Connecting to(%s:%d) ...", remoteEP.get_host_addr(), remoteEP.get_port_number());
        //更新当前连接时间戳
        time(&m_tmLastConnect);
        int nRet = 0;
        if (m_nLocalPort > 0)
        {
            //绑定本地固定端口号
            ACE_INET_Addr localEP(m_nLocalPort);
            nRet = connector.connect(pSockHandler, remoteEP, synch_option, localEP);  
        }
        else
        {
            //绑定本地随机端口号
            nRet = connector.connect(pSockHandler, remoteEP, synch_option);
        }
        //连接失败
        if(nRet == -1)
        {
            //轮询切换连接主备服务器(存在)
            if(!m_strBackupIPAddress.empty() && m_strBackupIPAddress.compare(m_strHostIPAddress) != 0)
            {
                m_strConnectIPAddress = 
                    m_strConnectIPAddress.compare(m_strHostIPAddress) == 0 ? m_strBackupIPAddress : m_strHostIPAddress;
            }  
            OnConnectChange(false);
            return -1;
        }
        //启动接收事件(OneTime)
        if(!nRet && m_lStop)
        {
            m_lStop = false; 
            this->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED);
        } 
        OnConnectChange(true);
        return 0;
    }
    
    int CTcpClient::svc()
    {
        //接收
        while(!m_lStop)
        { 
            ACE_Time_Value tvSleep;
            tvSleep.msec(TASK_NAP_TIME_VALUE);
            ACE_OS::sleep(tvSleep);    
            ACE_Time_Value tvWaite(0, TASK_NAP_TIME_VALUE); 
            //BLOCKED
            this->reactor()->handle_events(&tvWaite);
        }
        return 0;    
    }
    
    int CTcpClient::open(void* param)
    {
        return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK);
        //if (Base::open(param) == -1)
        //{
        //    Log(LOGLEVEL_ERROR, "open() Failied.");
        //    return -1;
        //}
        //return 0;
    }
    
    int CTcpClient::handle_input(ACE_HANDLE)
    {    
        char *szBuffer = new char[DEFAULT_BUFFER_SIZE];
        //接收数据
        ssize_t length = this->peer().recv(szBuffer, DEFAULT_BUFFER_SIZE);
        //连接断开接收失败
        if(length <= 0) 
        {
            delete[] szBuffer; 
            return -1;//implicit call handle_close() clear up
        }
        OnReceive(szBuffer, length);
        return 0;
    }
    
    int CTcpClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask)
    {    
        int nRet = ACE_Event_Handler::handle_close(handle, closeMask);
        Disconnect();
        return nRet;
    }
    
    int CTcpClient::handle_output(ACE_HANDLE handle /* = ACE_INVALID_HANDLE */)
    {
        //调用一次
        return 0;
    }
    
    long CTcpClient::Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut)
    { 
        //Confirmed
        //implicit call handle_close() clear up
        if(!IsConnected())
            return -1;
        
        ACE_Time_Value tvTimeout(0, nCommunicateTimeOut); 
        //return this->peer().recv((void *)szReceive, lReceiveSize, &tvTimeout);
        return this->peer().recv_n((void *)szReceive, lReceiveSize, &tvTimeout);
    } 
    
    long CTcpClient::Send(const char* szSend, long lSendSize, int nCommunicateTimeOut)
    {
        //Uncertainty
        //implicit call handle_close() clear up
        if(!IsConnected())
            return -1;
    
        ACE_Time_Value tvTimeout(0, nCommunicateTimeOut);
        ssize_t length = this->peer().send_n(szSend, lSendSize, &tvTimeout);
        return length;
    }
    View Code
  • 相关阅读:
    SQL语句之奇形怪状的冷门函数
    计算累计收益
    关于SQL表字段值缺失的处理办法
    虚拟机移植到另一台机器
    分分钟搞懂rank() over(partition by)的使用
    分分钟搞懂union与union all
    【转】10分钟就能学会的.NET Core配置
    【转】依赖注入的威力,.NET Core的魅力:解决MVC视图中的中文被html编码的问题
    【转】Asp.Net Core2.0获取客户IP地址,及解决发布到Ubuntu服务器获取不到正确IP解决办法
    【转】在.net Core 中像以前那样的使用HttpContext.Current
  • 原文地址:https://www.cnblogs.com/wjshan0808/p/6612394.html
Copyright © 2011-2022 走看看