zoukankan      html  css  js  c++  java
  • tcp 输入 prequeue以及backlog队列

    /*ipv4_specific是TCP传输层到网络层数据发送以及TCP建立过程的真正OPS,
    在tcp_prot->init中被赋值给inet_connection_sock->icsk_af_ops
    这里面有每种协议传输层的接收函数,后面的inetsw_array那几行是套接口层的相关函数
     在函数中执行handler,见函数ip_local_deliver_finish
    family协议族通过sock_register注册  传输层接口tcp_prot udp_prot netlink_prot等通过proto_register注册   
    IP层接口通过inet_add_protocol(&icmp_protocol等注册 ,这些组成过程参考inet_init函数
    IP层处理完后(包括ip_local_deliver_finish和icmp_unreach),走到这里,
    这是IP层和传输层的邻借口,然后在由这里走到tcp_prot udp_prot raw_prot
    这些是传输层的接收处理过程,传输层和套接口层的处理过程需
    要使用udp_prot tcp_prot raw_prot过渡到socket层,处理过程参考inetsw_array
    */static const struct net_protocol tcp_protocol = {
        .early_demux    =    tcp_v4_early_demux,
        .handler    =    tcp_v4_rcv,/*当接收到报文后,ip层处理完后
        在ip_local_deliver_finish 函数中ret = ipprot->handler(skb);走到这里          
        从这里面跳转到tcp_prot*/
        .err_handler    =    tcp_v4_err,/*icmp_unreach当收到ICMP差错报文后,
        如果引起差错的是TCP包就走到该函数*/
        .no_policy    =    1,
        .netns_ok    =    1,
        .icmp_strict_tag_validation = 1,
    };


    在服务器接收了syn之后,会调用tcp_conn_request来处理连接请求,其中调用inet_reqsk_alloc来创建请求控制块,
    可见请求控制块的ireq_state被初始化为TCP_NEW_SYN_RECV


    对于协议栈的接收路径,syn包的处理路径如下
    tcp_v4_rcv
    ->__inet_lookup_skb() //listen hash中找到对应的TCP_LISTEN的sk
    ->tcp_v4_do_rcv()
    ->tcp_v4_cookie_check() //syncookie检查,因为没有syn包没有ack选项,因此忽略, 如果syncookie验证通过则创建新的sock
    ->tcp_rcv_state_process()
    ->tcp_v4_conn_request()
    对于syncookie,服务端不保存任何状态
    对于fastopen,新建sock进入TCP_SYN_RCV状态, 并插入等待accpet队列,并把数据部分放倒接收队列中, 并设置重传定时器
    对于一般的syn包,request_sock设置为TCP_NEW_SYN_RECV,插入ehash表, 设置req_timer定时器,重传synack

    int tcp_v4_rcv(struct sk_buff *skb)
    {
        struct net *net = dev_net(skb->dev);
        const struct iphdr *iph;
        const struct tcphdr *th;
        bool refcounted;
        struct sock *sk;
        int ret;
     /* 非本机 */
        if (skb->pkt_type != PACKET_HOST)
            goto discard_it;
    
        /* Count it even if it's bad */
        __TCP_INC_STATS(net, TCP_MIB_INSEGS);
      /* 检查头部数据,若不满足,则拷贝分片 */
        if (!pskb_may_pull(skb, sizeof(struct tcphdr)))
            goto discard_it;
     /* 取tcp头 */
        th = (const struct tcphdr *)skb->data;
    
        if (unlikely(th->doff < sizeof(struct tcphdr) / 4))
            goto bad_packet;
           /* 检查头部数据,若不满足,则拷贝分片 */
        if (!pskb_may_pull(skb, th->doff * 4))
            goto discard_it;
    
        /* An explanation is required here, I think.
         * Packet length and doff are validated by header prediction,
         * provided case of th->doff==0 is eliminated.
         * So, we defer the checks. */
    
        if (skb_checksum_init(skb, IPPROTO_TCP, inet_compute_pseudo))
            goto csum_error;
     /* 取tcp头 */
        th = (const struct tcphdr *)skb->data;
     /* 取ip头 */
        iph = ip_hdr(skb);
        /* This is tricky : We move IPCB at its correct location into TCP_SKB_CB()
         * barrier() makes sure compiler wont play fool^Waliasing games.
         */
         /* 移动ipcb */
        memmove(&TCP_SKB_CB(skb)->header.h4, IPCB(skb),
            sizeof(struct inet_skb_parm));
        barrier();
    
        /* 获取开始序号*/
        TCP_SKB_CB(skb)->seq = ntohl(th->seq);
         /* 获取结束序号,syn与fin各占1  */
        TCP_SKB_CB(skb)->end_seq = (TCP_SKB_CB(skb)->seq + th->syn + th->fin +
                        skb->len - th->doff * 4);
          /* 获取确认序号 */
        TCP_SKB_CB(skb)->ack_seq = ntohl(th->ack_seq);
            /* 获取标记字节,tcp首部第14个字节 */
        TCP_SKB_CB(skb)->tcp_flags = tcp_flag_byte(th);
        TCP_SKB_CB(skb)->tcp_tw_isn = 0;
         /* 获取ip头的服务字段 */
        TCP_SKB_CB(skb)->ip_dsfield = ipv4_get_dsfield(iph);
        TCP_SKB_CB(skb)->sacked     = 0;
    
    lookup:
         /* 查找控制块 查找传输控制块  先查找ehash 然后查找listen hash*/
        sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,
                       th->dest, &refcounted);
        if (!sk)
            goto no_tcp_socket;
    
    process:
           /* TIME_WAIT转过去处理 */
        if (sk->sk_state == TCP_TIME_WAIT)
            goto do_time_wait;
        /*
    三次握手的第二阶段,服务器发送synack后,
    会进入TCP_NEW_SYN_RECV状态,并插入ehash中。
    收到握手最后一个ack后,会找到TCP_NEW_SYN_RECV状态的req
    ,然后创建一个新的sock进入TCP_SYN_RECV状态
    ,最终进入TCP_ESTABLISHED状态. 并放入accept队列通知select/epoll
        */
     /* TCP_NEW_SYN_RECV状态处理 */
        if (sk->sk_state == TCP_NEW_SYN_RECV) {
            struct request_sock *req = inet_reqsk(sk);
            struct sock *nsk;
      /* 获取控制块 */
            sk = req->rsk_listener;
            if (unlikely(tcp_v4_inbound_md5_hash(sk, skb))) {
                reqsk_put(req);
                goto discard_it;
            }
            if (unlikely(sk->sk_state != TCP_LISTEN)) {
                /* 从连接队列移除控制块 */
                inet_csk_reqsk_queue_drop_and_put(sk, req);
                goto lookup;/* 根据skb参数重新查找控制块 */
            }
            /* We own a reference on the listener, increase it again
             * as we might lose it too soon.
             */
            sock_hold(sk);
            refcounted = true;
             /* 处理第三次握手ack,成功返回新控制块 */
            nsk = tcp_check_req(sk, skb, req, false);//创建新的sock进入TCP_SYN_RECV state 并插入accept队列
            if (!nsk) {
                reqsk_put(req);
                goto discard_and_relse;
            }
             /* 未新建控制块,进一步处理 */
            if (nsk == sk) {
                reqsk_put(req);
                /*tcp_child_process中调用tcp_rcv_state_process来
                处理TCP_SYN_RECV状态的child sock
                。并进入TCP_ESTABLISHED状态*/
            } else if (tcp_child_process(sk, nsk, skb)) {
                //nsk进入ehash
            /* 有新建控制块,进行初始化等 */
                tcp_v4_send_reset(nsk, skb); /* 失败发送rst */
                goto discard_and_relse;
            } else {
                sock_put(sk);
                return 0;
            }
        }
        /*如果不是TCP_NEW_SYN_RECV 状态 */
        if (unlikely(iph->ttl < inet_sk(sk)->min_ttl)) {
            __NET_INC_STATS(net, LINUX_MIB_TCPMINTTLDROP);
            goto discard_and_relse;
        }
    
        if (!xfrm4_policy_check(sk, XFRM_POLICY_IN, skb))
            goto discard_and_relse;
    
        if (tcp_v4_inbound_md5_hash(sk, skb))
            goto discard_and_relse;
    
        nf_reset(skb);
    
        if (sk_filter(sk, skb))
            goto discard_and_relse;
    
        skb->dev = NULL;
    
        if (sk->sk_state == TCP_LISTEN) {//listen state
            ret = tcp_v4_do_rcv(sk, skb);//syn 报文
            goto put_and_return;
        }
    
        sk_incoming_cpu_update(sk);
    
        bh_lock_sock_nested(sk);
        tcp_segs_in(tcp_sk(sk), skb);//TCP_SYN_RECV状态被fast open sokets使用
        ret = 0;
        ///宏很简单就是判断(sk)->sk_lock.owned.也就是当进程上下文在使用这个sock时为1. 
        if (!sock_owned_by_user(sk)) { /* 未被用户锁定 */
            /*不启用sysctl_tcp_low_latency  能够提高tcp ip 协议栈的吞吐量以及反应速度,就调用tcp_prequeue将数据包加入prequeue队列*/
            if (!tcp_prequeue(sk, skb)) /* 未能加入到prequeue 也就是先将buffer放到prequeue队列中。如果成功则返回1.*/
                /*/客户端/connect()即使阻塞也不占有锁  同时处于synack,不会排入prepare队列*/
                ret = tcp_v4_do_rcv(sk, skb); /* 进入tcpv4处理 */
        } else if (unlikely(sk_add_backlog(sk, skb, /* 已经被用户锁定,加入到backlog */
                           sk->sk_rcvbuf + sk->sk_sndbuf))) {
            bh_unlock_sock(sk);
            __NET_INC_STATS(net, LINUX_MIB_TCPBACKLOGDROP);
            goto discard_and_relse;
        }
        bh_unlock_sock(sk);
    
    put_and_return:
        if (refcounted)
            sock_put(sk);
    
        return ret;
    
    no_tcp_socket:
        if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb))
            goto discard_it;
    
        if (tcp_checksum_complete(skb)) {
    csum_error:
            __TCP_INC_STATS(net, TCP_MIB_CSUMERRORS);
    bad_packet:
            __TCP_INC_STATS(net, TCP_MIB_INERRS);
        } else {
            tcp_v4_send_reset(NULL, skb);
        }
    
    discard_it:
        /* Discard frame. */
        kfree_skb(skb);
        return 0;
    
    discard_and_relse:
        sk_drops_add(sk, skb);
        if (refcounted)
            sock_put(sk);
        goto discard_it;
    
    do_time_wait:
        if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
            inet_twsk_put(inet_twsk(sk));
            goto discard_it;
        }
    
        if (tcp_checksum_complete(skb)) {
            inet_twsk_put(inet_twsk(sk));
            goto csum_error;
        }
        /* TIME_WAIT入包处理 */
        switch (tcp_timewait_state_process(inet_twsk(sk), skb, th)) {
        case TCP_TW_SYN: { /* 收到syn */
            struct sock *sk2 = inet_lookup_listener(dev_net(skb->dev),
                                &tcp_hashinfo, skb,
                                __tcp_hdrlen(th),
                                iph->saddr, th->source,
                                iph->daddr, th->dest,
                                inet_iif(skb));
            if (sk2) {
                inet_twsk_deschedule_put(inet_twsk(sk));
                sk = sk2;
                refcounted = false;
                goto process;
            }
            /* Fall through to ACK */
        }
        case TCP_TW_ACK: /* 发送ack */
            tcp_v4_timewait_ack(sk, skb);
            break;
        case TCP_TW_RST:/* 发送rst */
            tcp_v4_send_reset(sk, skb);
            inet_twsk_deschedule_put(inet_twsk(sk));
            goto discard_it;
        case TCP_TW_SUCCESS:;  /* 成功*/
        }
        goto discard_it;
    }

    TCP接收方存在3种队列:

    1 Backlog Queue (sk->backlog)

    2 Prequeue Queue (tp->ucopy.prequeue)

    3 Receive Queue (sk->receive_queue)

    然后来看3个队列的区别。
    首先sk_backlog队列是当当前的sock在进程上下文中被使用时,如果这个时候有数据到来,则将数据拷贝到sk_backlog.

    prequeue则是数据buffer第一站,一般都是这里,如果prequeue已满,则会拷贝数据到receive_queue队列种。
    最后一个receive_queue也就是进程上下文第一个取buffer的队列

    这里为什么要有prequeue呢,直接放到receive_queue不就好了.因为receive_queue的处理比较繁琐
    (看tcp_rcv_established的实现就知道了,分为slow path和fast path),而软中断每次只能处理一个数据包
    (在一个cpu上),因此为了软中断能尽快完成,我们就可以先将数据放到prequeue中(tcp_prequeue),然后软
    中断就直接返回. 而处理prequeue就放到进程上下文(tcp_recvmsg调用中)去处理了

    /* Packet is added to VJ-style prequeue for processing in process
     * context, if a reader task is waiting. Apparently, this exciting
     * idea (VJ's mail "Re: query about TCP header on tcp-ip" of 07 Sep 93)
     * failed somewhere. Latency? Burstiness? Well, at least now we will
     * see, why it failed. 8)8)                  --ANK
     *
     下面总结一下prequeue机制下应用进程收包的流程。
    (1)应用进程在通过系统调用使用tcp_recvmsg函数接收数据时安装一个prequeue队列的接收器,释放sock,然后守株待兔
    (2)内核收包软中断在进入tcp_v4_rcv函数时如果sock没有被进程锁定,则会将skb放入prequeue中,并唤醒进程
    (3)进程被唤醒后锁定sock,调用tcp_prequeue_process函数将prequeue中所有的skb送入tcp_v4_do_rcv函数进行处理
    (4)在进程锁定sock的时候,内核软中断会将skb放入backlog队列中而不是prequeue队列,在进程是否sock的时候backlog队列中的skb也会被送入tcp_v4_do_rcv函数
    (5)送入tcp_v4_do_rcv函数的数据会被送入到快速路径或慢速路径进行处理。而无论是进入快速路径还是慢速路径,skb中的数据最终都会被copy到应用进程的缓存中
     prequeue机制与普通机制的主要区别在于,在进程没有收取到足够的数
     据而睡眠等待时,prequeue机制会将skb放入prequeue队列中再唤醒进程,
     再由进程对skb进行TCP协议处理,再copy数据;而普通模式下skb会
     在软中断上下文处理,在放入sk->sk_receive_queue队列中后再唤醒进程,
     进程被唤醒后只是copy数据。对比普通模式,prequeue机制下使得skb的TCP协
     议处理延迟,延迟的时间为从skb被放入prequeue队列并唤醒进程开始,
     到进程被调度到时调用tcp_prequeue_process函数处理skb时截止。对于收数据
     的进程而言在一次数据接收过程中其实并没有延迟,因为普通模式
     下进程也会经历睡眠-唤醒的过程。但由于TCP协议处理被延迟,导致
     ACK的发送延迟,从而使数据发送端的数据发送延迟,最终会使得
     整个通信过程延迟增大。
     */
     /*
    看到 Prequeue 是个可选项,默认是开启的但能通过 net.ipv4.tcp_low_latency来关闭。有这个选项存在就说明 Prequeue 存在的理由不像 Receive Queue 和 Backlog 
    一样那么明确可靠。所以我们需要看看 Prequeue 存在的原因。
    
    有个关于 Prequeue 作用的讨论在这里: Linux Kernel - TCP prequeue performance,可以参考一下。
    
    如果关闭 Prequeue,我们知道如果 Socket 没有被 User 占用,收到的 sk_buff 会直接调用 tcp_v4_do_rcv 进行处理,放入 Receive Queue,
    这一切都会在 softIRQ 的 context 中执行,最关键的是在放入 Receive Queue 后会回复 ack,而实际此时用户进程并没有实际收到数据,离用户进程起来处理数据还有一段时间。
    这就导致对端收到 ack 后认为对方能很快处理数据从而会发的更快,直到对方 Receive Queue 满了之后突然不再回复 ack,开始丢包。而一般情况下 TCP 连接对性能影响最大的就是丢包,
    重传,所以需要尽可能避免上述情况的发生。这种情形下,ack 相当于是只送达了对方机器就被回复了,而没有送到目标进程。
    
    有了 Prequeue 之后,ack 会有两种回复方式,一种是用户进程被唤醒将 Prequeue 数据读入 Receive Queue 后回复 ack,这种时候数据是确认送达用户进程了。
    另一种是用户进程迟迟无法被唤醒,延迟 ack 的定时器被触发而回复 ack,这样也能减慢 ack 回复速度让对端知道这边处理性能有点跟不上,要慢点发数据。
    两种方式都能减少或避免之前说的问题,这也是 Prequeue 存在的意义。
    
    但是对于体量小延迟又要求高的数据包,Prequeue 的存在又会增加延迟。原因是如果关闭了 Prequeue 机制,每来一条数据都要经过 tcp_v4_do_rcv 的处理,
    上面我们只看了一下 Fast Path,但能走 Fast Path 的要求还是比较苛刻的,不能有乱序到达,数据只能是单向,要么单向收要么单向发等等条件,
    只要有一条不满足就要走 Slow Path。Slow Path 内各种检查会更多,更麻烦一些。除了检查还一个耗时的是计算 checksum。如果没有 Prequeue 则这些逻辑全部要在 
    softIRQ context 内完成。在 User 进程被唤醒前可能只能放很少的数据到 Receive Queue 内。而有了 Prequeue 后,softIRQ 内只需要将数据包放入队列,
    不做任何检查和处理,接着就能处理下一个数据包,等到用户进程被唤醒后能从 Prequeue 批量处理数据。
    
    不过 Prequeue 是 Linux 特有的机制,近些年因为 NIC 会自动计算 checksum,不需要在收到数据过程中再计算了,所以 Prequeue 存在的意义基本只是延迟 
    ack 回复到用户进程内这一个。开启它实际对延迟增加并不明显: the myth of /proc/sys/net/ipv4/tcp_low_latency 、
    What is the linux kernel parameter tcp_low_latency?在 IPV6 内更是去掉了这个机制。
    */
    /* Packet is added to VJ-style prequeue for processing in process
     * context, if a reader task is waiting. Apparently, this exciting
     * idea (VJ's mail "Re: query about TCP header on tcp-ip" of 07 Sep 93)
     * failed somewhere. Latency? Burstiness? Well, at least now we will
     * see, why it failed. 8)8)                  --ANK
     *
     */
    bool tcp_prequeue(struct sock *sk, struct sk_buff *skb)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        /*/如果启用tcp_low_latency或者ucopy.task为空则返回0.ucopy.task为空一
        般是表示进程空间有进程在等待sock的数据的到来,
        因此我们需要直接复制数据到receive队列。并唤醒
        数据包在软中断(函数tcp_v4_do_rcv)中处理。
    */
    /* 内核要求低延迟或不是处于进程上下文,则不能使用prequeue */
        if (sysctl_tcp_low_latency || !tp->ucopy.task)
            return false;
    /*现在是处于进程上下文**/
        if (skb->len <= tcp_hdrlen(skb) &&//skb中无数据
            skb_queue_len(&tp->ucopy.prequeue) == 0)//prequeue中没有skb
            return false;
    
        /* Before escaping RCU protected region, we need to take care of skb
         * dst. Prequeue is only enabled for established sockets.
         * For such sockets, we might need the skb dst only to set sk->sk_rx_dst
         * Instead of doing full sk_rx_dst validity here, let's perform
         * an optimistic check.
         */
        if (likely(sk->sk_rx_dst))
            skb_dst_drop(skb);
        else
            skb_dst_force_safe(skb);
    
        __skb_queue_tail(&tp->ucopy.prequeue, skb);
        tp->ucopy.memory += skb->truesize;
      /*
    如果prequeue队列中积累过多的数据,则需要将队列中所有的skb全部送入TCP协议处理函数
    */
    //数据包过多或者缓存不够,直接在softirq上下文中处理
        if (skb_queue_len(&tp->ucopy.prequeue) >= 32 ||
            tp->ucopy.memory + atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf) {
            struct sk_buff *skb1;
    
            BUG_ON(sock_owned_by_user(sk));
            __NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPPREQUEUEDROPPED,
                    skb_queue_len(&tp->ucopy.prequeue));
            ///遍历prequeue队列。 
    
            while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
                sk_backlog_rcv(sk, skb1);///这个函数最终也会调用tcp_v4_do_rcv(也就是加入到receive队列中).  
    
            tp->ucopy.memory = 0;
        } else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {/*这里表示这个数据包是prequeue的第一个包。然后唤醒等待队列。  /*
    如果prequeue中从无到有增加了一个skb,则需要唤醒等待数据的进程进行处理,并设置延迟ACK定时器
    */    
    //sk_wait_data中有个时间窗口,可能不会唤醒用户进程。 如果没有唤醒则会导致应用程序等待超时,还有cache的刷新
            wake_up_interruptible_sync_poll(sk_sleep(sk),
                           POLLIN | POLLRDNORM | POLLRDBAND);
            if (!inet_csk_ack_scheduled(sk))//之前没有ack被推迟,重置dack定时器, 因为不知道应用程序会什么时候处理prequeue
                inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,//如果应用程序一直不处理,则在tcp_delack_timer_handler中处理prequeue队列
                              (3 * tcp_rto_min(sk)) / 4,
                              TCP_RTO_MAX);
        }
        return true;
    }
    void tcp_delack_timer_handler(struct sock *sk)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        struct inet_connection_sock *icsk = inet_csk(sk);
        sk_mem_reclaim_partial(sk);
        if (sk->sk_state == TCP_CLOSE || !(icsk->icsk_ack.pending & ICSK_ACK_TIMER))
            goto out;
        if (time_after(icsk->icsk_ack.timeout, jiffies)) {
            sk_reset_timer(sk, &icsk->icsk_delack_timer, icsk->icsk_ack.timeout);
            goto out;
        }
        icsk->icsk_ack.pending &= ~ICSK_ACK_TIMER;
        if (!skb_queue_empty(&tp->ucopy.prequeue)) {//处理prequeue
            struct sk_buff *skb;
            __NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPSCHEDULERFAILED);
            while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
                sk_backlog_rcv(sk, skb);
            tp->ucopy.memory = 0;
        }
        if (inet_csk_ack_scheduled(sk)) {//已经有ack被推迟过了
            if (!icsk->icsk_ack.pingpong) {    //不能延时,但却有ack被推迟了
                /* Delayed ACK missed: inflate ATO. */
                icsk->icsk_ack.ato = min(icsk->icsk_ack.ato << 1, icsk->icsk_rto); //double ato
            } else {
                /* Delayed ACK missed: leave pingpong mode and
                 * deflate ATO.
                 */
                 //这一次延时ack完成了,重置
                icsk->icsk_ack.pingpong = 0;
                icsk->icsk_ack.ato      = TCP_ATO_MIN;
            }
            tcp_send_ack(sk);    //发送ack
            __NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKS);
        }
    out:
        if (tcp_under_memory_pressure(sk))
            sk_mem_reclaim(sk);
    }
    static void tcp_delack_timer(unsigned long data)
    {
        struct sock *sk = (struct sock *)data;
        bh_lock_sock(sk);
        if (!sock_owned_by_user(sk)) {
            tcp_delack_timer_handler(sk);
        } else {
            inet_csk(sk)->icsk_ack.blocked = 1;
            __NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKLOCKED);
            /* deleguate our work to tcp_release_cb() */
            if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED, &tcp_sk(sk)->tsq_flags))    //设置标记,等应用程序release_sock的时候调用tcp_delack_timer_handler
                sock_hold(sk);
        }
        bh_unlock_sock(sk);
        sock_put(sk);
    }
    void tcp_release_cb(struct sock *sk)
    {
        ...
        sock_release_ownership(sk);
        ...
        if (flags & (1UL << TCP_DELACK_TIMER_DEFERRED)) {
            tcp_delack_timer_handler(sk);
            __sock_put(sk);
        }
    }

    对tcp-ack的影响:某种情形之下只有用户进程处理了这个skb的时候才发送ack。如果skb直接排入receive_queue的话,那么很可能直接就会发送skb,但是如果排入了prequeue,那么只有到了进程的上下文处理prequeue的时候才会处理ack应答,理论上如果这个进程很久才能被进程调度器调度到,那么很久才会发送ack,linux的解决办法是设置一个timer,规定一个阀值到期,到期后timer将prequeue中的skb放到receive_queue中。从对ack的影响来看看似延迟的ack是一件坏事,但是有时它会是一件好事,因为它能平缓对端的发送速率

    而应用程序没有锁的时候,则只能尝试把数据把放到另一个prequeue队列中。
    tcp_recvmsg()中并不会一直占有锁,在处理backlog的时候会主动释放锁,在阻塞等待的时候也会释放锁,这就给了数据包加入prequeue的时机。

    在delack定时器中,因为要尽可能多的ack数据,也会处理prequeue队列。因为delack也需要获得锁,因此在release_sock中处理backlog队列,如果有delack被推迟执行,则很可能也会处理prequeue队列。因此协议栈先尝试把包加入backlog中, 在尝试加入prequeue中,如果都不能则在软中断上下文中处理数据包;

    • 为什么不在协议栈中直接处理?
        1. cache的角度
          prequeue和backlog都是为了在应用程序上下文去处理数据包。
          因为softirq上下文和应用程序上下文之间切换,会造成cache刷新。
          比如ksoftirqd和应用程序不在一个cpu上; 或是协议栈处理完后通知应用程序,应用程序被唤醒,同样会造成造成cache刷新

        2. ack的角度
          如果直接在协议栈快速处理包,则很可能导致快速ack,使对方快速达到很大的发送速率,但是本地用户进程可能并不活跃,就会导致接收缓存满了,对方瞬间停止发送。 造成很明显的抖动。
          因此在应用程序处理数据包和ack的发送,可以反映用户进程的实时情况。
          但同时对突发的小包和需要低延迟的消息,放入prequeue中,也会造成非常不好的影响

        3. 锁的角度
          softirq中不能睡眠,也不应该跟应用程序抢锁, 如果tcp_recvmsg读取一大块内存,tcp_v4_rcv就需要很久才能抢到锁。
          因此使用prequeue就可以避免这样的情况。

    backlog队列

    sock_owned_by_user(sk)会被bh_lock_sock_nested(sk)保护,因此如果sock_owned_by_user(sk)并把skb加入backlog之前,应用程序不会release_sock。

    /* The per-socket spinlock must be held here. */
    static inline __must_check int sk_add_backlog(struct sock *sk, struct sk_buff *skb,
                              unsigned int limit)
    {
        if (sk_rcvqueues_full(sk, limit))    //backlog和sk_receive_queue中缓存使用是否超过限制
            return -ENOBUFS;
        /*
         * If the skb was allocated from pfmemalloc reserves, only
         * allow SOCK_MEMALLOC sockets to use it as this socket is
         * helping free memory
         */
        if (skb_pfmemalloc(skb) && !sock_flag(sk, SOCK_MEMALLOC))
            return -ENOMEM;
        __sk_add_backlog(sk, skb);    //添加到backlog
        sk->sk_backlog.len += skb->truesize;
        return 0;
    }
    /* OOB backlog add */
    static inline void __sk_add_backlog(struct sock *sk, struct sk_buff *skb)
    {
        /* dont let skb dst not refcounted, we are going to leave rcu lock */
        skb_dst_force_safe(skb);
        if (!sk->sk_backlog.tail)
            sk->sk_backlog.head = skb;
        else
            sk->sk_backlog.tail->next = skb;
        sk->sk_backlog.tail = skb;
        skb->next = NULL;
    }
    void release_sock(struct sock *sk)
    {
        spin_lock_bh(&sk->sk_lock.slock);
        if (sk->sk_backlog.tail) //这里会遍历backlog队列中的每一个报文
            __release_sock(sk);//sk_backlog_rcv处理backlog队列也就是tcp_v4_do_rcv处理
    
    
        /* Warning : release_cb() might need to release sk ownership,
         * ie call sock_release_ownership(sk) before us.
         */
        if (sk->sk_prot->release_cb)
            sk->sk_prot->release_cb(sk); //如果delack推迟执行,也会处理prequeue队列
    //这里是网络中断执行时,告诉内核,现在socket并不在进程上下文中
    
        sock_release_ownership(sk);//!sock_owned_by_user(sk)//这里是网络中断执行时,告诉内核,现在socket并不在进程上下文中
    
        if (waitqueue_active(&sk->sk_lock.wq))    //如果其他进程在等待这个ownersh
            wake_up(&sk->sk_lock.wq);
        spin_unlock_bh(&sk->sk_lock.slock);
    }

    tcp_recvmsg

        }
        return copied;
    }
    EXPORT_SYMBOL(tcp_read_sock);
    
    /*
     *    This routine copies from a sock struct into the user buffer.
     *
     *    Technical note: in 2.3 we work on _locked_ socket, so that
     *    tricks with *seq access order and skb->users are not required.
     *    Probably, code can be easily improved even more.
     */
    
    int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
            int flags, int *addr_len)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        int copied = 0;
        u32 peek_seq;
        u32 *seq;
        unsigned long used;
        int err;
        int target;        /* Read at least this many bytes */
        long timeo;
        struct task_struct *user_recv = NULL;
        struct sk_buff *skb, *last;
        u32 urg_hole = 0;
    
        if (unlikely(flags & MSG_ERRQUEUE))
            return inet_recv_error(sk, msg, len, addr_len);
    
        if (sk_can_busy_loop(sk) && skb_queue_empty(&sk->sk_receive_queue) &&
            (sk->sk_state == TCP_ESTABLISHED))
            sk_busy_loop(sk, nonblock);
    //锁住socket,防止多进程并发访问TCP连接,告知软中断目前socket在进程上下文中
        lock_sock(sk);//设置sock_owned_by_user
    
        err = -ENOTCONN;
        if (sk->sk_state == TCP_LISTEN)
            goto out;
    //如果socket是阻塞套接字,则取出SO_RCVTIMEO作为读超时时间;若为非阻塞,则timeo为0。
        timeo = sock_rcvtimeo(sk, nonblock);
    
        /* Urgent data needs to be handled specially. */
        if (flags & MSG_OOB)
            goto recv_urg;
    
        if (unlikely(tp->repair)) {
            err = -EPERM;
            if (!(flags & MSG_PEEK))
                goto out;
    
            if (tp->repair_queue == TCP_SEND_QUEUE)
                goto recv_sndq;
    
            err = -EINVAL;
            if (tp->repair_queue == TCP_NO_QUEUE)
                goto out;
    
            /* 'common' recv queue MSG_PEEK-ing */
        }
    //获取下一个要拷贝的字节序号
        seq = &tp->copied_seq;
        if (flags & MSG_PEEK) {//当flags参数有MSG_PEEK标志位时,意味着这次拷贝的内容,当再次读取socket时(比如另一个进程)还能再次读到
            peek_seq = tp->copied_seq;
            seq = &peek_seq;
        }
    //获取SO_RCVLOWAT最低接收阀值,当然,target实际上是用户态内存大小len和SO_RCVLOWAT的最小值
    //注意:flags参数中若携带MSG_WAITALL标志位,则意味着必须等到读取到len长度的消息才能返回,此时target只能是len
    
        target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
    
        do {
            u32 offset;
    
            /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
            if (tp->urg_data && tp->urg_seq == *seq) {
                if (copied)
                    break;
                if (signal_pending(current)) {
                    copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
                    break;
                }
            }
    
            /* Next get a buffer. */
    
            last = skb_peek_tail(&sk->sk_receive_queue);
            skb_queue_walk(&sk->sk_receive_queue, skb) {
                last = skb;
                /* Now that we have two receive queues this
                 * shouldn't happen.
                 */
                if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
                     "recvmsg bug: copied %X seq %X rcvnxt %X fl %X
    ",
                     *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
                     flags))
                    break;
                //offset是待拷贝序号在当前这个报文中的偏移量
                offset = *seq - TCP_SKB_CB(skb)->seq;
                if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
                    pr_err_once("%s: found a SYN, please report !
    ", __func__);
                    offset--;//有些时候,三次握手的SYN包也会携带消息内容的,此时seq是多出1的(SYN占1个序号),所以offset减1
                }
                if (offset < skb->len)//若偏移量还有这个报文之内,则认为它需要处理
                    goto found_ok_skb;
                if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
                    goto found_fin_ok;
                WARN(!(flags & MSG_PEEK),
                     "recvmsg bug 2: copied %X seq %X rcvnxt %X fl %X
    ",
                     *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
            }
    
            /* Well, if we have backlog, try to process it now yet. */
    //如果receive队列为空,则检查已经拷贝的字节数,是否达到了SO_RCVLOWAT或者长度len。满足了,且backlog队列也为空,则可以返回用户态了
            if (copied >= target && !sk->sk_backlog.tail)//得到需要的数据长度,且backlog中没有数据
                break;
    
            if (copied) {//在tcp_recvmsg里,copied就是已经拷贝的字节数
                if (sk->sk_err ||
                    sk->sk_state == TCP_CLOSE ||
                    (sk->sk_shutdown & RCV_SHUTDOWN) ||
                    !timeo ||
                    signal_pending(current))
                    break;
            } else {
                if (sock_flag(sk, SOCK_DONE))
                    break;
    
                if (sk->sk_err) {
                    copied = sock_error(sk);
                    break;
                }
    
                if (sk->sk_shutdown & RCV_SHUTDOWN)
                    break;//一个字节都没拷贝到,但如果shutdown关闭了socket,一样直接返回
    
                if (sk->sk_state == TCP_CLOSE) {
                    if (!sock_flag(sk, SOCK_DONE)) {
                        /* This occurs when user tries to read
                         * from never connected socket.
                         */
                        copied = -ENOTCONN;
                        break;
                    }
                    break;
                }
    
                if (!timeo) {
                    copied = -EAGAIN;
                    break; //非阻塞套接字读取不到数据时也会返回,错误码正是EAGAIN
                }
    
                if (signal_pending(current)) {
                    copied = sock_intr_errno(timeo);
                    break;
                }
            }
    
            tcp_cleanup_rbuf(sk, copied);
            //tcp_low_latency默认是关闭的 //开启prequeue,没有设置ucopy进程,或者为当前进程
            if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {//阻塞调用, 尝试处理prequeue队列
                /* Install new reader */
                if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
                    user_recv = current;
                    tp->ucopy.task = user_recv;//设置当前进程正在准备接受ucopy中的数据
                    tp->ucopy.msg = msg;
                }
    
                tp->ucopy.len = len;//设置msg中应用缓存的长度
    
                WARN_ON(tp->copied_seq != tp->rcv_nxt &&
                    !(flags & (MSG_PEEK | MSG_TRUNC)));
    
                /* Ugly... If prequeue is not empty, we have to
                 * process it before releasing socket, otherwise
                 * order will be broken at second iteration.
                 * More elegant solution is required!!!
                 *
                 * Look: we have the following (pseudo)queues:
                 *
                 * 1. packets in flight
                 * 2. backlog
                 * 3. prequeue
                 * 4. receive_queue
                 *
                 * Each queue can be processed only if the next ones
                 * are empty. At this point we have empty receive_queue.
                 * But prequeue _can_ be not empty after 2nd iteration,
                 * when we jumped to start of loop because backlog
                 * processing added something to receive_queue.
                 * We cannot release_sock(), because backlog contains
                 * packets arrived _after_ prequeued ones.
                 *
                 * Shortly, algorithm is clear --- to process all
                 * the queues in order. We could make it more directly,
                 * requeueing packets from backlog to prequeue, if
                 * is not empty. It is more elegant, but eats cycles,
                 * unfortunately.
                 *///prequeue队列就是为了提高系统整体效率的,即prequeue队列有可能不为空,这是因为进程休眠等待时可能有新报文到达prequeue队列
    
                if (!skb_queue_empty(&tp->ucopy.prequeue))
                    goto do_prequeue;
    
                /* __ Set realtime policy in scheduler __ */
            }
            //处理完prequeue队列,开始处理backlog队列
        //达到要接收的字节数
            if (copied >= target) {//如果已经拷贝了的字节数超过了最低阀值 
                /* Do not sleep, just process backlog. *///处理完backlog后重新加锁
                release_sock(sk);
                lock_sock(sk);
            } else {//没有读取到足够长度的消息,因此会进程休眠,如果没有被唤醒,最长睡眠timeo时间
    //没收到目标大小的数据,等待新数据到达sk_receive_queue。 等待前会调用release_sock处理backlog队列
                sk_wait_data(sk, &timeo, last);
            }
    
            if (user_recv) {//处理完backlog, 这时候如果是顺序数据包,ucopy中会被添加数据
                int chunk;
    
                /* __ Restore normal policy in scheduler __ */
    // 此时ucopy中的是backlog处理后的数据
                chunk = len - tp->ucopy.len;
                if (chunk != 0) {
                    NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
                    len -= chunk;
                    copied += chunk;
                }
    
                if (tp->rcv_nxt == tp->copied_seq &&
                    !skb_queue_empty(&tp->ucopy.prequeue)) {//因为之前处理backlog的时候,释放过锁,这时候prequeue中可能有新的包
    do_prequeue:
                    tcp_prequeue_process(sk);//接上面代码段,开始处理prequeue队列里的报文 tcp_v4_do_rcv prequeue
    
                    chunk = len - tp->ucopy.len;
                    if (chunk != 0) {
                        NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
                        len -= chunk;
                        copied += chunk;
                    }
                }
            }
            if ((flags & MSG_PEEK) &&
                (peek_seq - copied - urg_hole != tp->copied_seq)) {
                net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK
    ",
                            current->comm,
                            task_pid_nr(current));
                peek_seq = tp->copied_seq;
            }
            continue;
    
        found_ok_skb:
            /* Ok so how much can we use? */
        //receive队列的这个报文从其可以使用的偏移量offset,到总长度len之间,可以拷贝的长度为used
            used = skb->len - offset;
            if (len < used)//len是用户态空闲内存,len更小时,当然只能拷贝len长度消息,总不能导致内存溢出吧
                used = len;
    
            /* Do we have urgent data here? */
            if (tp->urg_data) {
                u32 urg_offset = tp->urg_seq - *seq;
                if (urg_offset < used) {
                    if (!urg_offset) {
                        if (!sock_flag(sk, SOCK_URGINLINE)) {
                            ++*seq;
                            urg_hole++;
                            offset++;
                            used--;
                            if (!used)
                                goto skip_copy;
                        }
                    } else
                        used = urg_offset;
                }
            }
    
            if (!(flags & MSG_TRUNC)) {//MSG_TRUNC标志位表示不要管len这个用户态内存有多大,只管拷贝数据吧
    
                err = skb_copy_datagram_msg(skb, offset, msg, used);
                if (err) {
                    /* Exception. Bailout! */
                    if (!copied)
                        copied = -EFAULT;
                    break;
                }
            }
    //因为是指针,所以同时更新copied_seq--下一个待接收的序号
            *seq += used;
            copied += used;
            len -= used;
    
            tcp_rcv_space_adjust(sk);
    
    skip_copy:
            if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
                tp->urg_data = 0;
                tcp_fast_path_check(sk);
            }
            if (used + offset < skb->len)
                continue;
    
            if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
                goto found_fin_ok;
            if (!(flags & MSG_PEEK))
                sk_eat_skb(sk, skb);
            continue;
    
        found_fin_ok:
            /* Process the FIN. */
            ++*seq;
            if (!(flags & MSG_PEEK))
                sk_eat_skb(sk, skb);
            break;
        } while (len > 0);
    //已经装载了接收器 /数据copy够了,再给一个prequeue处理机会
        if (user_recv) {//prequeue队列不为空则处理之
            if (!skb_queue_empty(&tp->ucopy.prequeue)) {
                int chunk;
    
                tp->ucopy.len = copied > 0 ? len : 0;//还有剩余空间则copy到ucopy,没有则放到sk_receive_queue中
    
                tcp_prequeue_process(sk);
    
                if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
                    NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
                    len -= chunk;
                    copied += chunk;
                }
            }
    //准备返回用户态,socket上不再装载接收任务  
            tp->ucopy.task = NULL;//结束prequeue处理
            tp->ucopy.len = 0;
        }
    
        /* According to UNIX98, msg_name/msg_namelen are ignored
         * on connected socket. I was just happy when found this 8) --ANK
         */
    
        /* Clean up data we have read: This will do ACK frames. */
        tcp_cleanup_rbuf(sk, copied);
    //释放socket时,还会检查、处理backlog队列中的报文
        release_sock(sk);//处理backlog,释放锁
        return copied;
    
    out:
        release_sock(sk);
        return err;
    
    recv_urg:
        err = tcp_recv_urg(sk, msg, len, flags);
        goto out;
    
    recv_sndq:
        err = tcp_peek_sndq(sk, msg, len);
        goto out;
  • 相关阅读:
    Eclipse的常见使用错误及编译错误
    Android学习笔记之Bundle
    Android牟利之道(二)广告平台的介绍
    Perl dbmopen()函数
    Perl子例程(函数)
    Perl内置操作符
    Perl正则表达式
    Linux之间配置SSH互信(SSH免密码登录)
    思科路由器NAT配置详解(转)
    Windows下查看端口被程序占用的方法
  • 原文地址:https://www.cnblogs.com/codestack/p/11133714.html
Copyright © 2011-2022 走看看