zoukankan      html  css  js  c++  java
  • 使用epoll实现异步的tcp请求客户端

    公司有一个需求,要求客户端能够单线程异步发送请求到服务端。
    当在不引入过多第三方库的情况下,可以使用下列的方式发送请求。

    当然,C++11有很多异步的库,代码示例用的C++11仅是为了简化线程模型,省去自己封装线程池的麻烦。

    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <netdb.h>
    #include <errno.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <sys/types.h>
    #include <sys/wait.h>
    #include <sys/time.h>
    #include <sys/resource.h>
    #include <poll.h>
    #include <sys/epoll.h>
    #include <iostream>
    #include <sstream>
    #include <thread>
    #include <mutex>
    #include <list>
    
    using namespace std;
    
    #define IPADDR "192.168.15.135"
    #define PORT 19090
    #define MAXLINE 1024
    
    typedef unsigned long long UInt64;
    
    enum FD_STEP
    {
            STEP_NONE,
            STEP_CONNECT,
            STEP_RW
    };
    
    typedef struct SReqNode{
        int fd;
        int key;
        unsigned int seq;
        FD_STEP step;
        std::string host;
        SReqNode()
        {
            fd  = -1;
            seq = 0;
            key = 0;
            host = "";
            step = STEP_NONE;
        }
        SReqNode(int fd,int key,unsigned int seq)
        {
            this->fd  = fd;
            this->key = key;
            this->seq = seq;
        }
    } SReqNode;
    
    bool Resolve( const char* szHost, in_addr* phost )
    {
        bool bRet = false;
        bool bIsName = false;
    
        if( NULL == szHost ) return false;
    
        const char* p = szHost;
        while( *p )
        {
            if( (*p < '0' || *p > '9') && *p != '.' )
            {
                bIsName = true;
                break;
            }
            p++;
        }
    
        if( bIsName )
        {
            hostent* phent;
            phent = gethostbyname( szHost );
            if( phent )
            {
                memcpy( phost, phent->h_addr, sizeof(in_addr) );
                bRet = true;
            }
        }
        else
        {
            bRet = ( 0 != inet_aton( szHost, phost ) );
        }
    
        return bRet;
    }
    
    class CTCPClient{
    
    public:
        CTCPClient()
        {
            m_ptrThread = NULL;
        }
        ~CTCPClient()
        {
           if (m_ptrThread){
               m_ptrThread->join();
               delete m_ptrThread;
           }
        }
        
        static void _entry(void* ptr)
        {
           ( (CTCPClient*)ptr)->Run();
        }
    
        void Initialize()
        {
            m_epfd = epoll_create(256);
            m_globalCnt = 1;
            m_ptrThread = new thread(CTCPClient::_entry, this);
        }
        void AddMsg(int key)
        {
            m_msgListMtx.lock();
            m_msgList.push_back(key);
            m_msgListMtx.unlock();
        }
        void dealNewMsg()
        {
            std::list<int>  msgList;
            m_msgListMtx.lock();
            msgList.swap(m_msgList);
            m_msgListMtx.unlock();
            std::list<int>::iterator iter = msgList.begin();
            for (;iter != msgList.end(); iter++)
            {
                dealConnect(*iter);
            }
        }
        void dealConnect(int key)
        {
            struct sockaddr_in servaddr;
            int sock_cli = socket(AF_INET, SOCK_STREAM, 0);
            servaddr.sin_family = AF_INET;
            servaddr.sin_port = htons(PORT);
            
            if ( Resolve(IPADDR,&(servaddr.sin_addr)) )
            {
                std::string tmpIp = std::string(inet_ntoa(servaddr.sin_addr));
                cout << tmpIp << endl;
            }else{
                cout << "Resolve " << IPADDR << " ERROR." << endl;
            }
            
            //servaddr.sin_addr.s_addr = inet_addr(IPADDR);
            int flags = fcntl(sock_cli, F_GETFL, 0);
            fcntl(sock_cli, F_SETFL, flags|O_NONBLOCK);
            
            connect(sock_cli, (struct sockaddr *)&servaddr, sizeof(servaddr));
            SReqNode* ptr = new SReqNode(sock_cli, key, m_globalCnt++);
            ptr->step = STEP_CONNECT;
            ptr->host = std::string(IPADDR) + ":" + std::to_string(PORT);
            struct epoll_event ev;
            ev.data.ptr = (void* )ptr;
            ev.events=EPOLLOUT|EPOLLET;
            epoll_ctl(m_epfd, EPOLL_CTL_ADD, sock_cli, &ev);
        }
        void Run()
        {
            int i , maxi, connfd , sockfd, nfds;
            int n;
            char buf[MAXLINE];
            struct epoll_event ev, events[20];  
            socklen_t clilen;
    
            for(;;)
            {
                dealNewMsg();
                nfds=epoll_wait(m_epfd,events,20,500); 
                for(i=0; i<nfds; i++)
                {
                    SReqNode* ptr = (SReqNode*)events[i].data.ptr;
                    if ( events[i].events&EPOLLOUT)
                    {
                        if ( ptr->step != STEP_CONNECT )
                        {
                            cout << "EPOLLOUT But not connect will release." << endl;
                            epoll_ctl(m_epfd, EPOLL_CTL_DEL, ptr->fd, &events[i]);
                            close(ptr->fd);
                            delete ptr;
                        }
                        ostringstream rspStr;
                        rspStr << "GET /vinput?key=" << ptr->key << "& HTTP/1.1
    ";
                        rspStr << "Host: " + ptr->host + "
    ";
                        rspStr << "Connection: close
    ";
                        rspStr << "User-Agent: Mozilla/5.0
    ";
                        rspStr << "Accept-Language: zh-CN,zh;q=0.9
    
    ";
                        
                        ptr->step = STEP_RW;
                        ev.data.ptr = ptr; 
                        ev.events=EPOLLIN|EPOLLET;
                        epoll_ctl(m_epfd, EPOLL_CTL_MOD, ptr->fd, &ev);
                        send(ptr->fd, rspStr.str().c_str(), rspStr.str().length(), 0);
                    }
                    else if (events[i].events & EPOLLIN)
                    {
                        SReqNode* tmpPrt = (SReqNode*)events[i].data.ptr;
                        sockfd = tmpPrt->fd;
                        if ( sockfd < 0)  
                            continue;  
                        n = recv(sockfd,buf,MAXLINE,0);
                        if (n <= 0)   
                        {    
                            close(sockfd);  
                            events[i].data.fd = -1;  
                        }
                        else
                        {
                            buf[n]='';
                            printf("Socket %d said : %s
    ",sockfd,buf);
                        }
                        epoll_ctl(m_epfd, EPOLL_CTL_DEL, sockfd, &events[i]);
                        close(sockfd);
                        delete tmpPrt;
                    }
                    else 
                    {
                        printf("This is not avaible!");
                    }
                }
            }
            close(m_epfd);
        }
    
    private:
        int m_epfd;
        std::mutex m_msgListMtx;
        std::list<int>  m_msgList;
        int m_globalCnt;
        thread* m_ptrThread;
    };
    
    int main()
    {
        CTCPClient tcpClient;
        tcpClient.Initialize();
        
        int input;
        while(1)
        {
            cout << "input:";
            cin >> input;
            if (input == 3306)
            {
                break;
            }
            tcpClient.AddMsg(input);
        }
        return 0;
    }
    
    /***********************
    //编译方法:
    $ g++ tcpclient.cpp -g -lpthread -std=c++11 -o tcpclient
    $ ./tcpclient
    input:3
    Socket 5 said : HTTP/1.1 200 OK
    Server: Apache-Coyote/1.1
    Content-Type: text/plain;charset=UTF-8
    Connection: close
    Content-Length: 6
    
    Recv:3
    ************************/
    
    转载请注明来源:https://www.cnblogs.com/bugutian/
  • 相关阅读:
    Anaconda使用命令
    排序算法3--插入排序--希尔排序(缩小增量排序)
    排序算法2--插入排序--折半插入排序
    排序算法1--插入排序--直接插入排序
    排序总结---常用的排序算法总结,java和js实现
    前端兼容性问题
    js对象的几种创建方式和js实现继承的方式[转]
    js原型和原型链[转]
    性能优化的方法
    Http状态码
  • 原文地址:https://www.cnblogs.com/bugutian/p/15324211.html
Copyright © 2011-2022 走看看