zoukankan      html  css  js  c++  java
  • 一次sendmsg的改造过程

    比较蛋疼的一个改造过程,简单记录一下。

    场景:用户态使用sendmsg发包,tcp报文,由于内核实现过程中存在一次kernel_read,也就是存在将pagecache中的内容拷贝一次的问题。

    为了减少这次拷贝,简单地将这个对pagecache的拷贝过程使用分散聚集io方式来进行map,map的数据来自于pagecahce中的文件。

    这样就不存在拷贝了。

    看过这部分代码的人肯定觉得既然不想拷贝,为啥不用sendfile来实现,关键是因为需要每个包有部分数据来自于用户态,比如rtp头,rtsp头等。

    而媒体数据在内核态组装,如果是udp好办,我们做了一个solaris版本的sendfile,可以带一个用户态数据区过来组装报文,然后直接发送。

    但是tcp的话,存在重发控制,简单通过qdisc流控又达不到df值的要求,所以就遗留了这么一个改动。

    在改动过程,一开始的方案是参照tcp_sendpage来改造,但由于用户传入的

    struct msghdr {
        void    *    msg_name;    /* Socket name            */
        int        msg_namelen;    /* Length of name        */
        struct iovec *    msg_iov;    /* Data blocks            */
        __kernel_size_t    msg_iovlen;    /* Number of blocks        */
        void     *    msg_control;    /* Per protocol magic (eg BSD file descriptor passing) */
        __kernel_size_t    msg_controllen;    /* Length of cmsg list */
        unsigned    msg_flags;
    };

    msg_iov 的个数比较多,也就是msg_iovlen比较多,(这个msg_iov里面不是直接要发送的业务数据,而是针对file的偏移和长度段),这样的话,调用tcp_sendpage就很频繁,

    由于3.10版本的内核的tcp_sendpage如下:

    int tcp_sendpage(struct sock *sk, struct page *page, int offset,
             size_t size, int flags)
    {
        ssize_t res;
    
        if (!(sk->sk_route_caps & NETIF_F_SG) ||//网卡是否支持分散聚集io
            !(sk->sk_route_caps & NETIF_F_ALL_CSUM))//网卡具备硬件执行校验和的标志
            return sock_no_sendpage(sk->sk_socket, page, offset, size,
                        flags);
    
        lock_sock(sk);
        res = do_tcp_sendpages(sk, &page, offset, size, flags);
        release_sock(sk);
        return res;
    }

    也就是说,每次操作一个很短的page都是在lock的情况下,这样锁操作多了,也间接影响了收包。

    所以进一步参照 do_tcp_sendpages 的实现,自己申请了skb,挂载已经读取的poge段,放在分散聚集io的

    skb_shinfo(skb)->frags 

    数组中,而挂载的过程参照splice的实现。实现了免拷贝,以及减少了锁的申请,所以性能提升了20%左右。

    另外一个有趣的发现是,由于接受方的0窗口,导致我们这边服务器发包的时候,没有窗口可以发送,间接影响了我们调用接口的时候的

    static inline int sk_stream_memory_free(struct sock *sk)
    {
        return sk->sk_wmem_queued < sk->sk_sndbuf;
    }

    出现了一些tryagain。

    出现tryagain的时候,我做了一个改动,就是增加sk ->sk_sndbuf的大小,当然这个值是小于系统配置的缓冲区大小,相当于代替用户调用了一次setsockopt,来扩展sendbuf。

    另外测试发现,哪怕我sk_sndbuf不动,但是我释放锁再申请锁,都能大概率减少tryagain,一开始猜测认为是因为释放锁导致了我们的数据被ack了,所以我们的sk_wmem_queued减少了,

    后来看到我们释放锁里面,其实做了很多工作:

    void release_sock(struct sock *sk)
    {
        /*
         * The sk_lock has mutex_unlock() semantics:
         */
        mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
    
        spin_lock_bh(&sk->sk_lock.slock);
        if (sk->sk_backlog.tail)
            __release_sock(sk);--------------重点看这个
    
        if (proto_has_rhel_ext(sk->sk_prot, RHEL_PROTO_HAS_RELEASE_CB) &&
            sk->sk_prot->release_cb)
            sk->sk_prot->release_cb(sk);
    
        sk->sk_lock.owned = 0;
        if (waitqueue_active(&sk->sk_lock.wq))
            wake_up(&sk->sk_lock.wq);
        spin_unlock_bh(&sk->sk_lock.slock);
    }

    可以看到,释放锁的时候,如果我们的sk_backlog中有报文的话,会调用__release_sock:

    static void __release_sock(struct sock *sk)
    {
        struct sk_buff *skb = sk->sk_backlog.head;
    
        do {
            sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
            bh_unlock_sock(sk);
    
            do {
                struct sk_buff *next = skb->next;
    
                skb->next = NULL;
                sk_backlog_rcv(sk, skb);-----------处理skb,针对tcp就是 tcp_v4_do_rcv
    
                /*
                 * We are in process context here with softirqs
                 * disabled, use cond_resched_softirq() to preempt.
                 * This is safe to do because we've taken the backlog
                 * queue private:
                 */
                cond_resched_softirq();
    
                skb = next;
            } while (skb != NULL);
    
            bh_lock_sock(sk);
        } while ((skb = sk->sk_backlog.head) != NULL);
    
        /*
         * Doing the zeroing here guarantee we can not loop forever
         * while a wild producer attempts to flood us.
         */
        sk_extended(sk)->sk_backlog.len = 0;
    }

    可以看到,会有一个处理skb报文的过程,而这个过程,针对tcp来说,如下调用链:

    tcp_v4_do_rcv--> 
    tcp_rcv_established-->tcp_ack->tcp_clean_rtx_queue-->sk_wmem_free_skb
     

    在tcp_ack中,一般做三件事:更新重传队列,更新发送窗口,从sack的信息或者重复ack来决定是否进入拥塞模式。 

    tcp_clean_rtx_queue中将会调用 sk_wmem_free_skb ,一般能将一些待确认的skb给释放掉,这样我们前面的对 sk_wmem_queued 和 sk->sk_sndbuf 的比较就可能为真,而不需要重试了。


    ps:release_sock这个函数曾经还引起了一个比较有意思的讨论:
    http://bbs.chinaunix.net/thread-4114007-1-6.html
     
    顺带,看了一下tcp_ack在处理 tcp_clean_rtx_queue 的时候比较有意思的地方,
    针对我们skb带了分散聚集io的情况,我们一个skb大概承载了40k左右的数据,而回复ack的时候,显然不会一个skb所有的报文发送出去,再收到ack,所以这个ack肯定会在skb的数据之间,
    这种情况下:
    static int tcp_clean_rtx_queue(struct sock *sk, int prior_fackets,
                       u32 prior_snd_una, long sack_rtt_us)
    {
    。。。。。
    
        while ((skb = tcp_write_queue_head(sk)) && skb != tcp_send_head(sk)) {
            struct tcp_skb_cb *scb = TCP_SKB_CB(skb);
            u8 sacked = scb->sacked;
            u32 acked_pcount;
    
            /* Determine how many packets and what bytes were acked, tso and else */
            if (after(scb->end_seq, tp->snd_una)) {
                if (tcp_skb_pcount(skb) == 1 ||
                    !after(tp->snd_una, scb->seq))
                    break;
    
                acked_pcount = tcp_tso_acked(sk, skb);--------------部分数据被ack,会进入这个流程
                if (!acked_pcount)
                    break;
    
                fully_acked = false;

    我们来看ack的处理,首先,如果skb是当前的待发送的skb,则直接跳过,因为我既然这个skb没有发送,则不应该收到我这个skb对应的seq范围之内的任何ack,所以可以直接跳过。

    其次,当前待发送的skb之前的skb的部分数据被ack的时候,假设这个ack跨越了两个skb,那么之前那个skb肯定要释放,对于剩余的ack数据,则会对该skb进行trim操作:

    /* If we get here, the whole TSO packet has not been acked. */
    static u32 tcp_tso_acked(struct sock *sk, struct sk_buff *skb)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        u32 packets_acked;
    
        BUG_ON(!after(TCP_SKB_CB(skb)->end_seq, tp->snd_una));
    
        packets_acked = tcp_skb_pcount(skb);
        if (tcp_trim_head(sk, skb, tp->snd_una - TCP_SKB_CB(skb)->seq))
            return 0;
        packets_acked -= tcp_skb_pcount(skb);
    
        if (packets_acked) {
            BUG_ON(tcp_skb_pcount(skb) == 0);
            BUG_ON(!before(TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq));
        }
    
        return packets_acked;
    }

    而tcp_trim_head其实是蛮重的,大家想想,当skb只有线性区的时候,很容易就将data指针移位,但是当我使用分散聚集io的时候,则需要根据ack值去找到在 shinfo->frags 的偏移,

    然后将ack之前的数据给释放掉:

    static void __pskb_trim_head(struct sk_buff *skb, int len)
    {
        struct skb_shared_info *shinfo;
        int i, k, eat;
    
        eat = min_t(int, len, skb_headlen(skb));
        if (eat) {
            __skb_pull(skb, eat);
            len -= eat;
            if (!len)
                return;
        }
        eat = len;
        k = 0;
        shinfo = skb_shinfo(skb);
        for (i = 0; i < shinfo->nr_frags; i++) {
            int size = skb_frag_size(&shinfo->frags[i]);
    
            if (size <= eat) {
                skb_frag_unref(skb, i);
                eat -= size;
            } else {
                shinfo->frags[k] = shinfo->frags[i];
                if (eat) {
                    shinfo->frags[k].page_offset += eat;
                    skb_frag_size_sub(&shinfo->frags[k], eat);
                    eat = 0;
                }
                k++;
            }
        }
        shinfo->nr_frags = k;
    
        skb_reset_tail_pointer(skb);
        skb->data_len -= len;
        skb->len = skb->data_len;
    }
     
    水平有限,如果有错误,请帮忙提醒我。如果您觉得本文对您有帮助,可以点击下面的 推荐 支持一下我。版权所有,需要转发请带上本文源地址,博客一直在更新,欢迎 关注 。
  • 相关阅读:
    wzplayer for android界面
    player stop处理
    wzplayer for android界面
    android屏幕监控上下左右滑动
    OpenGL + C++ + Java
    player stop处理
    EGLHelper
    Android NDK学习 <五> C++ 支持
    【认识之初】
    Java调用windows exe程序
  • 原文地址:https://www.cnblogs.com/10087622blog/p/10208772.html
Copyright © 2011-2022 走看看