zoukankan      html  css  js  c++  java
  • 网络编程非阻塞connect

    非阻塞connect

    ​ 看到Redis源码中主从复制的源码,对某些逻辑不是很确定。梳理了Redis非阻塞connect的大概实现之后,自己写了一个简单的版本。

    需要理解的是:非阻塞connect的完成被认为是使响应套接字可写。(UNP Ch6.10)

    一、主要流程:

    • 创建非阻塞socket,socket(...., SOCK_NONBLOCK, ...)
    • 检查connect(fd, ...)返回是否为0
    • 如果为-1,检查errno是否为EINPROGRESS,如果connect失败且错误不为EINPROGRESS,返回错误。
    • 返回fd,并利用IO多路复用阻塞,监听POLLOUT事件。
    • getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)检查socket状态
    • 成功
    #include <arpa/inet.h>
    #include <assert.h>
    #include <errno.h>
    #include <fcntl.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <strings.h>
    #include <sys/epoll.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <unistd.h>
    
    #define AF_ERR -1
    #define MAX_EVENTS 1024
    
    static void epoll_ctl_add(int epfd, int fd, int evts) {
        struct epoll_event ev;
        ev.events = evts;
        ev.data.fd = fd;
        int err = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
        assert(!err);
    }
    
    /* check socket status*/
    void connectionEstablished(int fd) {
        int sockerr = 0;
        socklen_t errlen = sizeof(sockerr);
        getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen);
        assert(sockerr == 0);
        printf("connection done.\n");
    }
    
    void handle_events(struct epoll_event* e, int epfd) {
        printf("events %d: ", e->data.fd);
        if (e->events & EPOLLOUT) {
            printf("EPOLLOUT ");
            connectionEstablished(e->data.fd);
        }
    }
    
    /* non-blocking-connect */
    int connect(const char* ip, int port) {
        struct sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &address.sin_addr);
        address.sin_port = htons(port);
        int s = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
        if (connect(s, (struct sockaddr*)&address, sizeof(address)) == -1) {
            if (errno == EINPROGRESS) {
                goto end;;
            }
            close(s);
            s = AF_ERR;
        }
    end:
        return s;
    }
    
    
    int main() {
        int fd = connect("127.0.0.1", 8888);
        if (fd == -1) {
            printf("connect failed\n");
            return 1;
        }
        int epfd;
        struct epoll_event events[MAX_EVENTS];
        epfd = epoll_create1(0);
        assert(epfd != -1);
        epoll_ctl_add(epfd, fd, EPOLLOUT);
        int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
        assert(n != -1);
        for (int i = 0; i < n; i++) {
            handle_events(&events[i], epfd);
        }
        close(fd);
        return 0;
    }
    

    使用nc -l 8888当服务端,测试发现确实是可以通过监听POLLOUT事件来判断connect成功的


    二、Redis源码:

    // src
    
    int connectWithMaster(void) {
        int fd;
    
        /* 从服务器作为client,执行connect(2)连接到master */
        fd = anetTcpNonBlockBindConnect(NULL,
            server.masterhost,server.masterport,REDIS_BIND_ADDR);
        if (fd == -1) {
            redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
                strerror(errno));
            return REDIS_ERR;
        }
    
        /* 监听读写事件,设置事件处理回调函数为syncWithMaster */
        if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                AE_ERR)
        {
            close(fd);
            redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
            return REDIS_ERR;
        }
    
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_s = fd;
        server.repl_state = REDIS_REPL_CONNECTING;
        return REDIS_OK;
    }
    
    
    
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        char tmpfile[256], *err;
        int dfd, maxtries = 5;
        int sockerr = 0, psync_result;
        socklen_t errlen = sizeof(sockerr);
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(privdata);
        REDIS_NOTUSED(mask);
    
        /* If this event fired after the user turned the instance into a master
         * with SLAVEOF NO ONE we must just return ASAP. */
        if (server.repl_state == REDIS_REPL_NONE) {
            close(fd);
            return;
        }
    
        /* Check for errors in the socket. */
        /* 检查socket状态 */
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
            sockerr = errno;
        if (sockerr) {
            aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
            redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
                strerror(sockerr));
            goto error;
        }
    
        /* If we were connecting, it's time to send a non blocking PING, we want to
         * make sure the master is able to reply before going into the actual
         * replication process where we have long timeouts in the order of
         * seconds (in the meantime the slave would block). */
        /* 建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令
           因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步
         */
        if (server.repl_state == REDIS_REPL_CONNECTING) {
            redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
            /* Delete the writable event so that the readable event remains
             * registered and we can wait for the PONG reply. */
            /* 这一步之后WR事件就可以取消 */
            aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
            server.repl_state = REDIS_REPL_RECEIVE_PONG;
            /* Send the PING, don't check for errors at all, we have the timeout
             * that will take care about this. */
            syncWrite(fd,"PING\r\n",6,100);
            return;
        }
    	
     	/* 对读事件的监听 */
        /* Receive the PONG command. */
        if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
            char buf[1024];
            // ...
        }
    }
    

    继续看一下Redis非阻塞IO的实现:

    // src/netdb.h
    
    #define ANET_CONNECT_NONE 0
    #define ANET_CONNECT_NONBLOCK 1
    static int anetTcpGenericConnect(char *err, char *addr, int port,
                                     char *source_addr, int flags)
    {
        // ...
        for (p = servinfo; p != NULL; p = p->ai_next) {
            if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
                /* If the socket is non-blocking, it is ok for connect() to
                     * return an EINPROGRESS error here. */
                if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK)
                    goto end;
                close(s);
                s = ANET_ERR;
                continue;
        	}
            goto end;
        // ...
    

    参考:

    1. c - Linux, sockets, non-blocking connect - Stack Overflow

    2. Linux高性能服务器编程[Ch9.5] 游双

    3. UNP[Ch6]

  • 相关阅读:
    四种访问修饰符详解(推荐)
    三层架构中DAL层Sqlhelper怎样快速掌握?(常用)
    ASP.NET中最常用的验证控件使用方法(推荐)
    .NetFrom验证方便的webconfig 配置及前台使用(推荐)
    CefSharp访问需要认证网页或接口(在Request的Headers中添加认证Token)
    CentOS7中配置vsftpd
    CentOS7下安装RabbitMQ
    CentOS7下让Asp.Net Core的网站自动运行
    Winform下的Combox根据值来选中项
    golang简单实现jwt验证(beego、xorm、jwt)
  • 原文地址:https://www.cnblogs.com/macguz/p/15848558.html
Copyright © 2011-2022 走看看