zoukankan      html  css  js  c++  java
  • Socket accept 简要分析

    accept 用于从指定套接字的连接队列中取出第一个连接,并返回一个新的套接字用于与客户端进行通信,示例代码如下

    #include <sys/types.h> /* See NOTES */
    #include <sys/socket.h>
    int accept(int sockfd, struct sockaddr *addr,  socklen_t *addrlen);
    int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

    其中的参数解释如下:
    ·int sockfd :处于监听状态的套接字。
    ·struct sockaddr*addr :用于保存对端的地址信息。
    ·socklen_t*addrlen :是一个输入输出值。调用者将其初始化为 addr 缓存的大小, accept 返回时,会
    将其设置为 addr 的大小。
    ·int flags :是新引入的系统调用 accept4 的标志位;目前支持 SOCK_NONBLOCK 和
    SOCK_CLOEXEC

    *
     *	For accept, we attempt to create a new socket, set up the link
     *	with the client, wake up the client, then return the new
     *	connected fd. We collect the address of the connector in kernel
     *	space and move it to user at the very end. This is unclean because
     *	we open the socket then return an error.
     *
     *	1003.1g adds the ability to recvmsg() to query connection pending
     *	status to recvmsg. We need to add that support in a way thats
     *	clean when we restucture accept also.
     */
    
    SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
    		int __user *, upeer_addrlen, int, flags)
    {
    	struct socket *sock, *newsock;
    	struct file *newfile;
    	int err, len, newfd, fput_needed;
    	struct sockaddr_storage address;
     /* 只允许使用这两个标志 */
    	if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
    		return -EINVAL;
    
    	if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
    		flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;
     /* 通过文件描述符fd,找到对应的socket。
         * 以fd为索引从当前进程的文件描述符表files_struct中找到对应的file实例,
         * 然后从file实例的private_data成员中获取socket实例。
         */
    	sock = sockfd_lookup_light(fd, &err, &fput_needed);
    	if (!sock)
    		goto out;
    
    	err = -ENFILE;
    	newsock = sock_alloc();
    	if (!newsock)
    		goto out_put;
     /* 新socket的类型 新socket的socket层操作 */
    	newsock->type = sock->type;
    	newsock->ops = sock->ops;
    
    	/*
    	 * We don't need try_module_get here, as the listening socket (sock)
    	 * has the protocol module (sock->ops->owner) held.
    Socekt层协议,对SOCK_STREAM来说是inet_stream_ops,它的引用计数加一
    	 */
    	__module_get(newsock->ops->owner);
     /*申请一个新的文件描述符*/
    	newfd = get_unused_fd_flags(flags);
    	if (unlikely(newfd < 0)) {
    		err = newfd;
    		sock_release(newsock);
    		goto out_put;
    	}
    /* 为socket创建一个对应的file结构sock->file,返回fd */
    	newfile = sock_alloc_file(newsock, flags, sock->sk->sk_prot_creator->name);
    	if (IS_ERR(newfile)) {
    		err = PTR_ERR(newfile);
    		put_unused_fd(newfd);
    		sock_release(newsock);
    		goto out_put;
    	}
    
    	err = security_socket_accept(sock, newsock);
    	if (err)
    		goto out_fd;
    /* SOCKET层的操作函数,如果是SOCK_STREAM,proto_ops为inet_stream_ops,
         * 接下来调用inet_accept()。
         */
    	err = sock->ops->accept(sock, newsock, sock->file->f_flags, false);
    	if (err < 0)
    		goto out_fd;
    
    	if (upeer_sockaddr) {/* 如果要保存对端地址 */
    		if (newsock->ops->getname(newsock, (struct sockaddr *)&address,
    					  &len, 2) < 0) {
    			err = -ECONNABORTED;
    			goto out_fd;
    		}/* 把内核空间的socket地址复制到用户空间 */
    		err = move_addr_to_user(&address,
    					len, upeer_sockaddr, upeer_addrlen);
    		if (err < 0)
    			goto out_fd;
    	}
    
    	/* File flags are not inherited via accept() unlike another OSes. 
    * 以newfd为索引,把newfile加入当前进程的文件描述符表files_struct中
        */
    
    	fd_install(newfd, newfile);
    	err = newfd;
    
    out_put:
    	fput_light(sock->file, fput_needed);
    out:
    	return err;
    out_fd:
    	fput(newfile);
    	put_unused_fd(newfd);
    	goto out_put;
    }
    
    static inline void sock_graft(struct sock *sk, struct socket *parent)
    {
        write_lock_bh(&sk->sk_callback_lock);
        sk->sk_wq = parent->wq;/指向parent 等待队列
        parent->sk = sk;
        sk_set_socket(sk, parent);
        sk->sk_uid = SOCK_INODE(parent)->i_uid;
        security_sock_graft(sk, parent);
        write_unlock_bh(&sk->sk_callback_lock);
    }

    SOCK_STREAM套接口的TCP层操作函数集实例为tcp_prot,其中连接接收函数为inet_csk_accept()

     inet_csk_accept()用于从backlog队列(全连接队列)中取出一个ESTABLISHED状态的连接请求块,返回它所对应的连接sock。

    1. 非阻塞的,且当前没有已建立的连接,则直接退出,返回-EAGAIN。

    2. 阻塞的,且当前没有已建立的连接:

        -.1 用户没有设置超时时间,则无限期阻塞。

        -.2 用户设置了超时时间,超时后会退出。

    1、当前全队列中有socket,则accept()直接返回对应的fd。
    2、如果当前全队列中没有socket,则如果当前socket是阻塞的,直接睡眠。
    3、如果当前全队列中没有socket,如果非阻塞,就直接返回-EAGAIN。
    4、如果是阻塞的listenfd,需要将当前进程挂在listenfd对应socket的等待队列里面,当前进程让出cpu,并且等待唤醒

    /*
     * This will accept the next outstanding connection.
     */
    struct sock *inet_csk_accept(struct sock *sk, int flags, int *err, bool kern)
    {
        struct inet_connection_sock *icsk = inet_csk(sk);
        struct request_sock_queue *queue = &icsk->icsk_accept_queue;
        struct request_sock *req;
        struct sock *newsk;
        int error;
    
        lock_sock(sk);
    
        /* We need to make sure that this socket is listening,
         * and that it has something pending.
         */
        error = -EINVAL;
        if (sk->sk_state != TCP_LISTEN)/* socket必须处于监听状态 */
            goto out_err;
    
        /* Find already established connection */
        if (reqsk_queue_empty(queue)) {//发没有现ESTABLISHED状态的连接请求块
         /* 等待超时时间,如果是非阻塞则为0 */
            long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
    
            /* If this is a non blocking socket don't sleep */
            error = -EAGAIN;
            if (!timeo)/* 如果是非阻塞的,则直接退出 */
                goto out_err;
            /* 阻塞等待,直到有全连接。如果用户有设置等待超时时间,超时后会退出 */
            error = inet_csk_wait_for_connect(sk, timeo);
            if (error)
                goto out_err;
        }
        //走到这里,说明全队列中有socket,直接取出来
        req = reqsk_queue_remove(queue, sk);
        newsk = req->sk;
    
        if (sk->sk_protocol == IPPROTO_TCP &&
            tcp_rsk(req)->tfo_listener) {
            spin_lock_bh(&queue->fastopenq.lock);
            if (tcp_rsk(req)->tfo_listener) {
                /* We are still waiting for the final ACK from 3WHS
                 * so can't free req now. Instead, we set req->sk to
                 * NULL to signify that the child socket is taken
                 * so reqsk_fastopen_remove() will free the req
                 * when 3WHS finishes (or is aborted).
                 */
                req->sk = NULL;
                req = NULL;
            }
            spin_unlock_bh(&queue->fastopenq.lock);
        }
    out:
        release_sock(sk);
        if (req)
            reqsk_put(req);
        return newsk;
    out_err:
        newsk = NULL;
        req = NULL;
        *err = error;
        goto out;
    }
    /*
     * Wait for an incoming connection, avoid race conditions. This must be called
     * with the socket locked.
     */
    static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
    {
        struct inet_connection_sock *icsk = inet_csk(sk);
        DEFINE_WAIT(wait);
        int err;
    
        /*
         * True wake-one mechanism for incoming connections: only
         * one process gets woken up, not the 'whole herd'.
         * Since we do not 'race & poll' for established sockets
         * anymore, the common case will execute the loop only once.
         *
         * Subtle issue: "add_wait_queue_exclusive()" will be added
         * after any current non-exclusive waiters, and we know that
         * it will always _stay_ after any new non-exclusive waiters
         * because all non-exclusive waiters are added at the
         * beginning of the wait-queue. As such, it's ok to "drop"
         * our exclusiveness temporarily when we get woken up without
         * having to remove and re-insert us on the wait queue.
         */
        for (;;) {
            //prepare_to_wait_exclusive很重要,把 wait 挂到当前sk的等待队列里面。
            prepare_to_wait_exclusive(sk_sleep(sk), &wait,
                          TASK_INTERRUPTIBLE);
            release_sock(sk);
            //icsk_accept_queue是全队列
            if (reqsk_queue_empty(&icsk->icsk_accept_queue))
                timeo = schedule_timeout(timeo);//阻塞情况下,只有主动唤醒当前进程,才会继续执行。
            lock_sock(sk);
            err = 0;
            
            //如果阻塞且非超时的情况从schedule_timeout返回,那么必然是全队列有值了。
            if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
                break;//这个break是所有程序必经之路
            err = -EINVAL;
            if (sk->sk_state != TCP_LISTEN)
                break;
            err = sock_intr_errno(timeo);
            
            //有信号或者睡眠时间满了,则退出循环,否则接着睡。
            if (signal_pending(current))
                break;
            err = -EAGAIN;
            if (!timeo)
                break;
        }
        finish_wait(sk_sleep(sk), &wait);
        return err;
    }

    prepare_to_wait_exclusive函数很重要,把当前上下文加到listenfd对应的socket等待队列里面,如果是多进程,那么listenfd对应的socket等待队列里面会有多个进程的上下文

    多进程accept,不考虑resuseport,那么多进程accept只会出现在父子进程同时accept的情况,那么上文也说过,prepare_to_wait_exclusive函数会被当前进程上下文加入到listenfd等待队列里面,所以父子进程的上下文都会加入到socket的等待队列里面。核心问题就是这么唤醒,我们可以相当,所谓的惊群,就是把等待队里里面的所有进程都唤醒。

    int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
    {
        struct sock *rsk;
    
        ......
        if (sk->sk_state == TCP_LISTEN) {
            struct sock *nsk = tcp_v4_hnd_req(sk, skb);
            if (!nsk)
                goto discard;
    
            if (nsk != sk) {
                sock_rps_save_rxhash(nsk, skb);
                //当三次握手客户端的ack到来时,会走tcp_child_process这里
                if (tcp_child_process(sk, nsk, skb)) {
                    rsk = nsk;
                    goto reset;
                }
                return 0;
            }
        }
        ......
    }
    
    
    int tcp_child_process(struct sock *parent, struct sock *child,
                  struct sk_buff *skb)
    {
        int ret = 0;
        int state = child->sk_state;
    
        if (!sock_owned_by_user(child)) {
            ret = tcp_rcv_state_process(child, skb, tcp_hdr(skb),
                            skb->len);
            /* Wakeup parent, send SIGIO */
            if (state == TCP_SYN_RECV && child->sk_state != state)
                parent->sk_data_ready(parent, 0);//唤醒 在accept的进程,调用 sock_def_readable
        } else {
            /* Alas, it is possible again, because we do lookup
             * in main socket hash table and lock on listening
             * socket does not protect us more.
             */
            __sk_add_backlog(child, skb);
        }
    
        bh_unlock_sock(child);
        sock_put(child);
        return ret;
    }
    
    
    static void sock_def_readable(struct sock *sk, int len)
    {
        struct socket_wq *wq;
    
        rcu_read_lock();
        wq = rcu_dereference(sk->sk_wq);
        //显然,我们在accept的时候调用了`prepare_to_wait_exclusive`加入了队列,故唤醒靠 wake_up_interruptible_sync_poll
        if (wq_has_sleeper(wq))
            wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
                            POLLRDNORM | POLLRDBAND);
        sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
        rcu_read_unlock();
    }

     多个进程accept的时候,内核只会唤醒1个等待的进程,且唤醒的逻辑是FIFO

    其代码逻辑如下

    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
                int nr_exclusive, int wake_flags, void *key)
    {
        wait_queue_t *curr, *next;
    
        list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
            unsigned flags = curr->flags;
    
            //prepare_to_wait_exclusive时候,flags是WQ_FLAG_EXCLUSIVE,入参nr_exclusive是1,所以只执行一次就break了。
            if (curr->func(curr, mode, wake_flags, key) &&
                    (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
                break;
        }
    }
  • 相关阅读:
    Centos 端口被占用,kill被占用的进程
    Centos7 docker安装GitLab
    MongoDB 3.6.1集群部署
    MySql时区修改
    Springboot默认定时任务——Scheduled注解
    Nacos Docker集群部署
    docker-compose使用
    docker部署redis集群
    设置Redis集群访问密码(不停机设置)
    AWS SNS 创建 订阅 发布
  • 原文地址:https://www.cnblogs.com/codestack/p/11099712.html
Copyright © 2011-2022 走看看