zoukankan      html  css  js  c++  java
  • Acceptor 和 Connector

    非阻塞Accept和Connect的封装器,WINDOWS版本

    acceptor

    #ifndef _ACCEPTOR_H
    #define _ACCEPTOR_H
    
    typedef struct acceptor* acceptor_t;
    typedef void (*on_accept)(SOCKET);
    acceptor_t create_acceptor(const char *ip,unsigned long port,on_accept accept_callback);
    void       destroy_acceptor(acceptor_t*);
    void       acceptor_run(acceptor_t,int ms);
    
    #endif
    #include <winsock2.h>
    #include <WinBase.h>
    #include <Winerror.h>
    #include <stdio.h>
    #include "Acceptor.h"
    
    struct acceptor
    {
        on_accept accept_callback;
        SOCKET    sock;
        FD_SET Set;
    };
    
    acceptor_t create_acceptor(const char *ip,unsigned long port,on_accept accept_callback)
    {
        SOCKET ListenSocket;
        struct sockaddr_in    addr;
        int                         optval=1;                        //Socket属性值
        unsigned long               ul=1;
        acceptor_t a;
        ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    
        if (INVALID_SOCKET == ListenSocket) 
        {
            printf("\nError occurred while opening socket: %d.", WSAGetLastError());
            return NULL;
        }
    
        addr.sin_family        = AF_INET;
        addr.sin_port        = htons((u_short)port);
        addr.sin_addr.S_un.S_addr = inet_addr(ip);
    
        if ((bind(ListenSocket, (struct sockaddr *)&addr, sizeof( struct sockaddr_in))) == SOCKET_ERROR)
        {
            closesocket(ListenSocket);
            return NULL;
        }
    
        if((listen(ListenSocket, 5)) == SOCKET_ERROR)
        {
            closesocket(ListenSocket);
            return NULL;
        }
        ioctlsocket(ListenSocket,FIONBIO,(unsigned long*)&ul);
    
        a = malloc(sizeof(*a));
        a->sock = ListenSocket;
        a->accept_callback = accept_callback;
        FD_ZERO(&a->Set);
        FD_SET(a->sock,&a->Set);
        return a;
    }
    
    void destroy_acceptor(acceptor_t *a)
    {
        closesocket((*a)->sock);
        free(*a);
        *a = NULL;
    }
    
    void acceptor_run(acceptor_t a,int ms)
    {
        struct timeval timeout;
        SOCKET client;
        struct sockaddr_in ClientAddress;
        int nClientLength = sizeof(ClientAddress);
        timeout.tv_sec = 0;
        timeout.tv_usec = 1000*ms;
        if(select(0, &a->Set,NULL, NULL, &timeout) >0 )
        {
            for(;;)
            {
                client = accept(a->sock, (struct sockaddr*)&ClientAddress, &nClientLength);
                if (INVALID_SOCKET == client)
                    break;
                else
                {
                    a->accept_callback(client);
                }
            }
        }
    
        FD_ZERO(&a->Set);
        FD_SET(a->sock,&a->Set);
    
    }

    connector

    #ifndef _CONNECTOR_H
    #define _CONNECTOR_H
    
    
    typedef struct connector *connector_t;
    typedef void (*on_connect)(SOCKET,const char *ip,unsigned long port);
    
    connector_t connector_create();
    void        connector_destroy(connector_t*);
    int         connector_connect(connector_t,const char *ip,unsigned long port,on_connect,unsigned long ms);
    void        connector_run(connector_t,unsigned long ms);
    
    #endif
    #include <winsock2.h>
    #include <WinBase.h>
    #include <Winerror.h>
    #include "Connector.h"
    #include <stdio.h>
    #include "link_list.h"
    typedef struct pending_connect
    {
        list_node  lnode;
        const char *ip;
        unsigned long port;
        SOCKET   sock;
        on_connect call_back;
        unsigned long timeout;
    };
    
    struct connector
    {
        FD_SET Set;
        struct link_list *_pending_connect;
        unsigned long fd_seisize;
    };
    
    connector_t connector_create()
    {
        connector_t c = malloc(sizeof(*c));
        c->fd_seisize = 0;
        FD_ZERO(&c->Set);
        c->_pending_connect = create_list();
        return c;
    }
    
    void connector_destroy(connector_t *c)
    {
        struct pending_connect *pc;
        while(pc = LIST_POP(struct pending_connect*,(*c)->_pending_connect))
            free(pc);
        free(*c);
        *c = 0;
    }
    
    int connector_connect(connector_t c,const char *ip,unsigned long port,on_connect call_back,unsigned long ms)
    {
        struct sockaddr_in remote;
        ULONG NonBlock = 1; 
        SOCKET sock;
        struct pending_connect *pc;
        int slot = -1;
        if(c->fd_seisize >= FD_SETSIZE)
            return -1;
        sock =socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (sock == INVALID_SOCKET)
        {
            printf("\nError occurred while opening socket: %d.", WSAGetLastError());
            return -1;
        }
        remote.sin_family = AF_INET;
        remote.sin_port = htons((u_short)port);
        remote.sin_addr.s_addr = inet_addr(ip);
        if(ms>0)
        {
            if (ioctlsocket(sock, FIONBIO, &NonBlock) == SOCKET_ERROR)
            {
                closesocket(sock);
                printf("ioctlsocket() failed with error %d\n", WSAGetLastError());
                return -1;
            }
        }
        if(connect(sock, (struct sockaddr *)&remote, sizeof(remote)) >=0 )
        {
            //连接成功,无需要后续处理了,直接调用回调函数
            call_back(sock,ip,port);
            return 0;
        }
    
        pc = malloc(sizeof(*pc));
        pc->lnode.next = NULL;
        pc->sock = sock;
        pc->ip = ip;
        pc->port = port;
        pc->call_back = call_back;
        pc->timeout = GetTickCount() + ms;
        FD_SET(sock,&c->Set);
        LIST_PUSH_BACK(c->_pending_connect,pc);
        ++c->fd_seisize;
        return 0;
    }
    
    void connector_run(connector_t c,unsigned long ms)
    {
        int i = 0;
        DWORD tick;
        int size;
        int total;
        struct pending_connect *pc;
        struct timeval timeout;
        timeout.tv_sec = 0;
        timeout.tv_usec = 1000*ms;
        size = list_size(c->_pending_connect);
        if(size == 0)
            return;
        if((total = select(0, NULL,&c->Set, NULL, &timeout)) >0 )
        {
            for(; i < size; ++i)
            {
                pc = LIST_POP(struct pending_connect*,c->_pending_connect);
                if(pc)
                {
                    if(FD_ISSET(pc->sock, &c->Set))
                    {
                        pc->call_back(pc->sock,pc->ip,pc->port);
                        free(pc);
                        --c->fd_seisize;
                    }
                    else
                        LIST_PUSH_BACK(c->_pending_connect,pc);
                }
            }
        }
        FD_ZERO(&c->Set);
        tick = GetTickCount();
        size = list_size(c->_pending_connect);
        i = 0;
        for(; i < (int)size; ++i)
        {
            pc = LIST_POP(struct pending_connect*,c->_pending_connect);
            if(tick >= pc->timeout)
            {
                //超时了
                pc->call_back(INVALID_SOCKET,pc->ip,pc->port);
                free(pc);
                --c->fd_seisize;
            }
            else
            {
                LIST_PUSH_BACK(c->_pending_connect,pc);
                FD_SET(pc->sock,&c->Set);
            }
        }
    }

    使用方式

    //acceptor
    acceptor_t a = create_acceptor("192.168.6.13",8010,&accept_callback);
    while(1)
        acceptor_run(a,10);
    
    //connector
    connector_t c = NULL;
    int ret;
    int i = 0;
    c =  connector_create();
    for( ; i < 100;++i)
    {
            ret = connector_connect(c,"192.168.6.13",8010,on_connect_callback,1000*20);
            Sleep(1);
    }
    while(1)
    {
        connector_run(c,0);
    }
  • 相关阅读:
    关于cmake、make、make install
    windows开启ip_forwarding功能
    最新devstack安装(ussuri)
    【rabbitmq】之业务封装
    【rabbitmq】之过期和死信队列
    【rabbitmq】之confirm和return机制
    【rabbitmq】之消费端手动ack
    java短网址服务
    详解druid打印SQL日志
    logback配置文件拆分,抽取公共配置
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2490371.html
Copyright © 2011-2022 走看看