zoukankan      html  css  js  c++  java
  • 一个发包乱序问题记录

    在用户线程绑定某个核的情况下,从某个线程发送的udp报文,偶尔出现了乱序。我们来分析下发包流程:

     0xffffffff81593b30 : dev_hard_start_xmit+0x0/0x1a0 [kernel]------------进入driver层
     0xffffffff81596b08 : __dev_queue_xmit+0x448/0x550 [kernel]--------------其实少了一个qdisc层的堆栈
     0xffffffff81596c20 : dev_queue_xmit+0x10/0x20 [kernel]-----------------dev层
     0xffffffff815a284d : neigh_resolve_output+0x11d/0x220 [kernel]
     0xffffffff815dd65c : ip_finish_output+0x2ac/0x7a0 [kernel]
     0xffffffff815dde53 : ip_output+0x73/0xe0 [kernel]
     0xffffffff815dba87 : ip_local_out_sk+0x37/0x40 [kernel]
     0xffffffff815dbdf3 : ip_queue_xmit+0x143/0x3a0 [kernel]----------------ip层
     0xffffffff815f60fc : tcp_transmit_skb+0x52c/0xa20 [kernel]
     0xffffffff815f687c : tcp_write_xmit+0x28c/0xcf0 [kernel]
     0xffffffff815f755e : __tcp_push_pending_frames+0x2e/0xc0 [kernel]
     0xffffffff815e54ac : tcp_push+0xec/0x120 [kernel]
     0xffffffff815e8e70 : tcp_sendmsg+0xd0/0xc30 [kernel]---------------------tcp层
     0xffffffff816153d9 : inet_sendmsg+0x69/0xb0 [kernel]---------------------inet层
     0xffffffff815765ad : sock_aio_write+0x15d/0x180 [kernel]
     0xffffffff81208093 : do_sync_write+0x93/0xe0 [kernel]---------------------vfs层
     0xffffffff81208c75 : vfs_write+0x1c5/0x1f0 [kernel]
     0xffffffff8120998f : sys_write+0x7f/0xe0 [kernel]
     0xffffffff816c5715 : system_call_fastpath+0x1c/0x21 [kernel]

    没有故意去抓udp的堆栈,除了tcp层那部分不太一样,其他都应该一样,不影响我们分析。

    可以看到,这个是sys态直接发送的案例,后面其实就是

    dev_hard_start_xmit--》xmit_one--》
    netdev_start_xmit--》
    __netdev_start_xmit--》
    ops->ndo_start_xmit,这个对于bond,这个是
    bond_start_xmit,最终还是会走到实体设备,如i40e的驱动,是i40e_lan_xmit_frame ,而ixegb的驱动则是:ixgbe_xmit_frame

    也就是,整个发送过程都体现出来了。而且这个流程是在sys态完成的,不是在软中断中,软中断的话,需要从net_tx_action 中看起。

    但是,如果对整个流程非常了解的人,可以看到堆栈中缺少一部分,那就是

    __qdisc_run--》qdisc_restart--》
    sch_direct_xmit--》
    dev_hard_start_xmit这部分并没有在此体现。

    从qdisc角度来看,虽然它的作用在于流量控制,但是也可以看做是dev层到驱动层的一个缓存层,从前面堆栈看,如果一个socket 固定从某个

    cpu上发送,不会出现乱序,到了网卡驱动层,更不会乱序,那么到了qdisc层会怎么样?

    下面详细分析下qdisc的可能乱序行为:

    int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
    {
    。。。
        txq = netdev_pick_tx(dev, skb, accel_priv);
        q = rcu_dereference_bh(txq->qdisc);
    。。。
        if (q->enqueue) {------------enqueue不为空,流控
            rc = __dev_xmit_skb(skb, q, dev, txq);
            goto out;
        }
    。。。
    }

    拿到skb之后,怎么选择哪个queue发送呢?

    //队列选择函数,如果不是多队列,ops->ndo_select_queue非空的话,则调用ops->ndo_select_queue,否则__netdev_pick_tx
    struct netdev_queue *netdev_pick_tx(struct net_device *dev,
                        struct sk_buff *skb,
                        void *accel_priv)
    {
        int queue_index = 0;
    
    #ifdef CONFIG_XPS
        u32 sender_cpu = skb->sender_cpu - 1;
    
        if (sender_cpu >= (u32)NR_CPUS)
            skb->sender_cpu = raw_smp_processor_id() + 1;-------------开启了xps的话,会设置sender_cpu的值为当前cpu号+1
    #endif
    
        if (dev->real_num_tx_queues != 1) {---------------------------多队列的设备
            const struct net_device_ops *ops = dev->netdev_ops;
            if (ops->ndo_select_queue)-----------i40e这个地方是NULL
                queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
                                    __netdev_pick_tx);
            else
                queue_index = __netdev_pick_tx(dev, skb);-------------所以i40e走这个流程
    
            if (!accel_priv)
                queue_index = netdev_cap_txqueue(dev, queue_index);
        }
    
        skb_set_queue_mapping(skb, queue_index);------------------保存对应的queue_index到skb->queue_mapping,发送的时候,会取这个来获取对应的queue
        return netdev_get_tx_queue(dev, queue_index);
    }

    既然i40e走的是默认的选queue_index的流程,那么就需要看一下这个函数:

    static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
    {
        struct sock *sk = skb->sk;
        int queue_index = sk_tx_queue_get(sk);------之前保存的queue_index
    
        if (queue_index < 0 || skb->ooo_okay ||
            queue_index >= dev->real_num_tx_queues) {
            int new_index = get_xps_queue(dev, skb);------开启了xps,优先选择
            if (new_index < 0)--------没选到
                new_index = skb_tx_hash(dev, skb);--------则直接hash
    
            if (queue_index != new_index && sk &&
                rcu_access_pointer(sk->sk_dst_cache))
                sk_tx_queue_set(sk, new_index);
    
            queue_index = new_index;
        }
    
        return queue_index;
    }

     在开启了xps的情况下,get_xps_queue 会根据xps_map来选择队列,如果xps只绑定了一个cpu,则用那个对应的queue-index,否则根据skb来hash选择:

    //开启xps,根据sender_cpu来选择map
    static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
    {
    #ifdef CONFIG_XPS
        struct xps_dev_maps *dev_maps;
        struct xps_map *map;
        int queue_index = -1;
    
        rcu_read_lock();
        dev_maps = rcu_dereference(dev->xps_maps);
        if (dev_maps) {
            map = rcu_dereference(
                dev_maps->cpu_map[skb->sender_cpu - 1]);//由于之前设置了sendercpu,所以这里取该cpu,找到对应的map
            if (map) {
                if (map->len == 1)
                    queue_index = map->queues[0];//如果cpu只关联当前net_device的一个队列,当然直接选择
                else
                    queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
                                           map->len)];//当前cpu关联了多个队列,则做下hash选择
                if (unlikely(queue_index >= dev->real_num_tx_queues))
                    queue_index = -1;
            }
        }
        rcu_read_unlock();
    
        return queue_index;
    #else
        return -1;
    #endif
    }

     我们目前设置的xps_map如下:

    cat /sys/class/net/eth0/queues/tx-0/xps_cpus
    00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001

    也就是一个队列,只对应一个cpu。这个展示的结果,是从队列的角度来展示对应的cpu,但是内核实现的时候,
    实际上代码内使用了反向映射,通过xps_dev_maps存放到cpu到tx队列集合的映射:参照:dev_maps->cpu_map 函数的 实现过程,

    按照我们目前这样的这样的话,一个队列对应的是一个核,一个核针对某个网卡,也只是某一个队列,按道理也不会乱序啊。因为txq因为queue-index唯一确定,

    而qdisc 又是由queue->qdisc唯一确定的。

    选择好了queue,那么来看对应的qdisc实现:

    static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                     struct net_device *dev,
                     struct netdev_queue *txq)
    {
        spinlock_t *root_lock = qdisc_lock(q);
        bool contended;
        int rc;
    
        qdisc_pkt_len_init(skb);
        qdisc_calculate_pkt_len(skb, q);
        /*
         * Heuristic to force contended enqueues to serialize on a
         * separate lock before trying to get qdisc main lock.
         * This permits __QDISC_STATE_RUNNING owner to get the lock more often
         * and dequeue packets faster.
         */
        contended = qdisc_is_running(q);
        if (unlikely(contended))
            spin_lock(&q->busylock);
    
        spin_lock(root_lock);
        ........//只保留一种条件,其他忽略
            rc = q->enqueue(skb, q) & NET_XMIT_MASK;//入队qdisc,对于fq,其实就是将当前的skb按顺序加到flow的尾部
            if (qdisc_run_begin(q)) {
                if (unlikely(contended)) {
                    spin_unlock(&q->busylock);
                    contended = false;
                }
                __qdisc_run(q);//取包调dequeue,也是按顺序的,
            }
        }
        spin_unlock(root_lock);
        if (unlikely(contended))
            spin_unlock(&q->busylock);
        return rc;
    }

    好的,正式进入了qdisc层,来看一下q->enqueue的实现:

    当然如果把qdisc看做一个对象的话,它的背后还有一堆class和filter,由于我们环境使用的是:

    [root@localhost ~]# tc qdisc show dev eth0
    qdisc mq 0: root
    qdisc fq 0: parent :1 limit 10000p flow_limit 100p buckets 1024 quantum 3028 initial_quantum 15140

    针对的是fq的实现:

    static struct Qdisc_ops fq_qdisc_ops __read_mostly = {
        .id        =    "fq",
        .priv_size    =    sizeof(struct fq_sched_data),
    
        .enqueue    =    fq_enqueue,

    看看实现:

    static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch)
    {
        struct fq_sched_data *q = qdisc_priv(sch);
        struct fq_flow *f;
    
        if (unlikely(sch->q.qlen >= sch->limit))//fq管理的flow个数超过阈值
            return qdisc_drop(skb, sch);------------------------------丢包,但不会乱序
    
        f = fq_classify(skb, q);-------------根据skb的sk来获取对应的flow,如果没有,则申请一个。
        if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) {
            q->stat_flows_plimit++;
            return qdisc_drop(skb, sch);
        }
    
        f->qlen++;
        if (skb_is_retransmit(skb))
            q->stat_tcp_retrans++;
        qdisc_qstats_backlog_inc(sch, skb);
        if (fq_flow_is_detached(f)) {
            fq_flow_add_tail(&q->new_flows, f);----------------------------------加到skb加到对应的flow中去,关键函数
            if (time_after(jiffies, f->age + q->flow_refill_delay))
                f->credit = max_t(u32, f->credit, q->quantum);
            q->inactive_flows--;
        }
    
        /* Note: this overwrites f->age */
        flow_queue_add(f, skb);
    
        if (unlikely(f == &q->internal)) {
            q->stat_internal_packets++;
        }
        sch->q.qlen++;
    
        return NET_XMIT_SUCCESS;
    }

    我们看一下加skb到flow中去的过程:

    static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb)
    {
        struct sk_buff *prev, *head = flow->head;
    
        skb->next = NULL;
        if (!head) {
            flow->head = skb;
            flow->tail = skb;
            return;
        }
        if (likely(!skb_is_retransmit(skb))) {
            flow->tail->next = skb;--------------------skb加到链表尾,所以绝对不会乱序
            flow->tail = skb;
            return;
        }

    不管怎么样,skb是加入到了对应的flow中了,就等着dequeue的时候发送了,间接地保证了时序。

    同理也可以分析 :

    static struct sk_buff *fq_dequeue(struct Qdisc *sch)
    这个也是按顺序取包发送,不会乱序。
    从这个流程看,应该不会乱序,那么最终乱序的原因是?仔细查看我们的发包流程,我们一直强调是在sys态,还有一个中断时发包的流程没有分析。
    我们来看i40e的napi发包模式:
    i40e_napi_poll函数调用 i40e_clean_tx_irq,清理发送队列的数据。
    static bool i40e_clean_tx_irq(struct i40e_vsi *vsi,
                      struct i40e_ring *tx_ring, int napi_budget)//发送完之后资源回收
    {
    。。。。
            if (__netif_subqueue_stopped(tx_ring->netdev,
                             tx_ring->queue_index) &&
               !test_bit(__I40E_DOWN, &vsi->state)) {
                netif_wake_subqueue(tx_ring->netdev,
                            tx_ring->queue_index);
                ++tx_ring->tx_stats.restart_queue;
            }
    。。。。。
    }

    具体查看 netif_wake_subqueue 的实现,

    netif_wake_subqueue --》__netif_schedule --》__netif_reschedule 
    static inline void __netif_reschedule(struct Qdisc *q)
    {
        struct softnet_data *sd;
        unsigned long flags;
    
        local_irq_save(flags);
        sd = this_cpu_ptr(&softnet_data);
        q->next_sched = NULL;
        *sd->output_queue_tailp = q;
        sd->output_queue_tailp = &q->next_sched;//output_queue_tailp只会在中断中操作,net_tx_action会获取sd->output_queue,然后调用qdisc_run发包。
        raise_softirq_irqoff(NET_TX_SOFTIRQ);
        local_irq_restore(flags);
    }

    触发软中断,而根据中断绑核的设置,该中断号绑定的cpu并不一定和xps的queue映射的cpu是同一个,这样的话,就存在两个cpu发送一个流的情况。一个是在sys态,调用qdisc_run发包,

    一个是在软中断处理时,调用qdisc_run发包,这两个cpu可能不是同一个。

    由于发送的时候,还是要调用qdisc的spin_lock,所以虽然cpu不是同一个,但还是依靠自旋锁控制了并发dequeue一个skb的情况。那就是不可能两个cpu都dequeue到同一个skb的情况。

    中断里面处理流程:

            root_lock = qdisc_lock(q);
                if (spin_trylock(root_lock)) {-------------获取qdisc的自旋锁
                    smp_mb__before_clear_bit();
                    clear_bit(__QDISC_STATE_SCHED,
                          &q->state);//清理__QDISC_STATE_SCHED状态,
                    qdisc_run(q);-------------------------这个里面还是会判断qdisc的state,如果是running状态,则直接返回,不会调用__qdisc_run
                    spin_unlock(root_lock);

    殊途同归,进入__qdisc_run:

    void __qdisc_run(struct Qdisc *q)//进入该函数,此时应持有qdisc_lock
    {
        int quota = weight_p;
    
        while (qdisc_restart(q)) {
            /*
             * Ordered by possible occurrence: Postpone processing if
             * 1. we've exceeded packet quota
             * 2. another process needs the CPU;
             */
            if (--quota <= 0 || need_resched()) {//配额用完了,__netif_schedule会触发软中断
                __netif_schedule(q);
                break;
            }
        }
    
        qdisc_run_end(q);
    }


    static inline int qdisc_restart(struct Qdisc *q)
    {
        struct netdev_queue *txq;
        struct net_device *dev;
        spinlock_t *root_lock;
        struct sk_buff *skb;
        bool validate;

        /* Dequeue packet */
        skb = dequeue_skb(q, &validate);
        if (unlikely(!skb))
            return 0;

        root_lock = qdisc_lock(q);
        dev = qdisc_dev(q);
        txq = skb_get_tx_queue(dev, skb);

        return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
    }

    所以按道理也能保证发送时序,但是由于我劫持了响应的网卡发包驱动,自己再做了一次缓存,而这些缓存的管理,是percpu的,所以qdisc持有锁发送的时候,最终到了两个不同的percpu的缓存,然后就存在发包乱序的可能了,毕竟两个cpu再也看不到对方的存在了,也不会按顺序从qdisc中取skb了,他们只管自己percpu缓存中的skb,所以会分别尝试获取:

    HARD_TX_LOCK(dev, txq, smp_processor_id());这把锁,然后发包。乱序就很正常了。
     
    结论:
    1.如果过多的cpu使用相同的tx队列,那么加重tx对应的qdisc锁的争抢,也会增加对txq->_xmit_lock的争抢,
    为了将qdisc的锁争抢降低到最低,最好就是: 如果每个cpu只关联了一个tx,甚至能消除竞争
    也可以减小因为发送完成中断造成的cache miss。
    因此xps_cpus的配置最好结合/proc/irq//smp_affinity, 映射最好在同一个cpu或者同一个numa node的cpu上。
     
    2.如果在驱动层再做了缓存,要保证各个cpu的时序非常困难,所以只能保证开启了xps的情况下,将xps映射到的cpu和网卡对应中断完全对应,比1的要求还要严格,
    因为多个cpu访问的percpu缓存再也不相关了,同一个流如果有skb在不同的percpu缓存,时序无法保证。
     

    当然还有一种情况是cpu的热插拔,这个不在本文讨论范围之内。

     附:

    一个net_device在初始化的时候:

    void dev_init_scheduler(struct net_device *dev)
    {
        dev->qdisc = &noop_qdisc;
    。。。。
    }

    但是当网卡up的时候,调用dev_activate,会重新设置,如果是多队列网卡,则设置为 mq_qdisc_ops:

    void dev_activate(struct net_device *dev)
    {
        int need_watchdog;
    
        /* No queueing discipline is attached to device;
         * create default one for devices, which need queueing
         * and noqueue_qdisc for virtual interfaces
         */
    
        if (dev->qdisc == &noop_qdisc)
            attach_default_qdiscs(dev);
    。。。。。
    }
    
    static void attach_default_qdiscs(struct net_device *dev)
    {
        struct netdev_queue *txq;
        struct Qdisc *qdisc;
    
        txq = netdev_get_tx_queue(dev, 0);
    
        if (!netif_is_multiqueue(dev) ||
            dev->priv_flags & IFF_NO_QUEUE) {
            netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
            dev->qdisc = txq->qdisc_sleeping;
            atomic_inc(&dev->qdisc->refcnt);
        } else {
            qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);-------------默认使用多队列策略设置root的策略,即mq_qdisc_ops
            if (qdisc) {
                dev->qdisc = qdisc;
                qdisc->ops->attach(qdisc);
            }
        }
    }
     
    参考资料:
    https://lwn.net/Articles/412062/
     
    水平有限,如果有错误,请帮忙提醒我。如果您觉得本文对您有帮助,可以点击下面的 推荐 支持一下我。版权所有,需要转发请带上本文源地址,博客一直在更新,欢迎 关注 。
  • 相关阅读:
    Unity 预处理命令
    Unity 2DSprite
    Unity 生命周期
    Unity 调用android插件
    Unity 关于属性的get/set
    代码的总体控制开关
    程序员怎么问问题?
    VCGLIB 的使用
    cuda实践(1)
    python之json文件解析
  • 原文地址:https://www.cnblogs.com/10087622blog/p/10081733.html
Copyright © 2011-2022 走看看