zoukankan      html  css  js  c++  java
  • dpdk网卡收包分析

     一个网络报文从网卡接收到被应用处理,中间主要需要经历两个阶段:

    阶段一:网卡通过其DMA硬件将收到的报文写入到收包队列中(入队)
    阶段二:应用从收包队列中读取报文(出队)
    由于目前正在使用vpp/dpdk 优化waf引擎的工作,所以就看看ixgbe网卡在dpdk框架下是怎么工作的。
    下面分别介绍一下 收包队列结构 初始化(使能) 收包流程

    收发包的配置和初始化,主要是配置收发队列等。

    收发包的配置最主要的工作就是配置网卡的收发队列,设置DMA拷贝数据包的地址等。使用数据包时,只要去对应队列取出指定地址的数据即可;主题配置函数见 rte_eth_dev_configure ;当收发队列配置完成后,就调用设备的配置函数,进行最后的配置。(*dev->dev_ops->dev_configure)(dev),-----进入ixgbe_dev_configure()来分析其过程,主要是调用了ixgbe_check_mq_mode()来检查队列的模式。然后设置允许接收批量和向量的模式

    2.数据包的获取和发送,主要是从队列中获取到数据包或者把数据包放到队列中。
    收包队列的构造主要是通过网卡队列设置函数 rte_eth_rx_queue_setup设置相关参数;最后,调用到队列的setup函数做最后的初始化。ret = (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
    socket_id, rx_conf, mp);
    对于ixgbe设备,rx_queue_setup就是函数ixgbe_dev_rx_queue_setup()

    说一说主要的结构体:

    /* Receive Descriptor - Advanced 
        pkt_addr:报文数据的物理地址,网卡DMA将报文数据通过该物理地址写入
    对应的内存空间。
        hdr_addr:报文的头信息,hdr_addr的最后一个bit为DD位,因为是union结构,
    即status_error的最后一个bit也对应DD位。
        网卡每次来了新的数据包,就检查rx_ring当前这个buf的DD位是否为0,
    如果为0那么表示当前buf可以使用,就让DMA将数据包copy到这个buf中,
    然后设置DD为1。如果为1,那么网卡就认为rx_ring队列满了,
    直接会将这个包给丢弃掉,记录一次imiss。(0->1)*/
    union ixgbe_adv_rx_desc {
        struct {
            __le64 pkt_addr; /* Packet buffer address */
            __le64 hdr_addr; /* Header buffer address */
        } read;
        struct {
            struct {
                union {
                    __le32 data;
                    struct {
                        __le16 pkt_info; /* RSS, Pkt type */
                        __le16 hdr_info; /* Splithdr, hdrlen */
                    } hs_rss;
                } lo_dword;
                union {
                    __le32 rss; /* RSS Hash */
                    struct {
                        __le16 ip_id; /* IP id */
                        __le16 csum; /* Packet Checksum */
                    } csum_ip;
                } hi_dword;
            } lower;
            struct {
                __le32 status_error; /* ext status/error */
                __le16 length; /* Packet length */
                __le16 vlan; /* VLAN tag */
            } upper;
        } wb;  /* writeback */
    };
    /**
     * Structure associated with each descriptor of the RX ring of a RX queue.
     sw_ring是由一个动态申请的数组构建的环形队列,队列的元素是ixgbe_rx_entry类型,
     队列的大小可配,一般最大可配4096
     mbuf:报文mbuf结构指针,mbuf用于管理一个报文,主要包含报文相关信息和报文数据。
     */
    struct ixgbe_rx_entry {
        struct rte_mbuf *mbuf; /**< mbuf associated with RX descriptor. */
    };
    /**
     * Structure associated with each RX queue.
     */
    struct ixgbe_rx_queue {
        struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
        /*rx_ring主要存储报文数据的物理地址,物理地址供网卡DMA使用,
        也称为DMA地址(硬件使用物理地址,将报文copy到报文物理位置上)。*/
        volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
        uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
        volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
        volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
        /*sw_ring主要存储报文数据的虚拟地址,虚拟地址供应用使用
        (软件使用虚拟地址,读取报文)报文数据的物理地址可以由报文数据的虚拟地址转化得到。*/
        struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
        struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
        struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
        struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
        uint64_t            mbuf_initializer; /**< value to init mbufs */
        uint16_t            nb_rx_desc; /**< number of RX descriptors. */
        uint16_t            rx_tail;  /**< current value of RDT register. */
        uint16_t            nb_rx_hold; /**< number of held free RX desc. */
        uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
        uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
        uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
        uint8_t            rx_using_sse;
        /**< indicates that vector RX is in use */
    #ifdef RTE_LIBRTE_SECURITY
        uint8_t            using_ipsec;
        /**< indicates that IPsec RX feature is in use */
    #endif
    #ifdef RTE_IXGBE_INC_VECTOR
        uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
        uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
    #endif
        uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
        uint16_t            queue_id; /**< RX queue index. */
        uint16_t            reg_idx;  /**< RX queue register index. */
        uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
        uint16_t            port_id;  /**< Device port identifier. */
        uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
        uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
        uint8_t             rx_deferred_start; /**< not in global dev start. */
        /** flags to set in mbuf when a vlan is detected. */
        uint64_t            vlan_flags;
        uint64_t        offloads; /**< Rx offloads with DEV_RX_OFFLOAD_* */
        /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
        struct rte_mbuf fake_mbuf;
        /** hold packets to return to application */
        struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
    };`
    View Code

    收包队列的启动主要是通过调用rte_eth_dev_start

    DPDK是零拷贝的,那么分配的mem_pool中的对象怎么和队列以及驱动联系起来呢????

    设备的启动是从rte_eth_dev_start()中开始,会调用  

    diag = (*dev->dev_ops->dev_start)(dev);

    找到设备启动的真正启动函数:ixgbe_dev_start

    其中队列初始化流程函数为:

    ixgbe_alloc_rx_queue_mbufs(struct ixgbe_rx_queue *rxq)
    {
        struct ixgbe_rx_entry *rxe = rxq->sw_ring;
        uint64_t dma_addr;
        unsigned int i;
    
        /* Initialize software ring entries 
        队列所属内存池的ring中循环取出了nb_rx_desc个mbuf指针,
        填充rxq->sw_ring。每个指针都指向内存池里的一个数据包空间
        然后就先填充了新分配的mbuf结构,最最重要的是填充计算了dma_addr
        初始化queue ring,即rxd的信息,标明了驱动把数据包放在dma_addr处。
        最后把分配的mbuf“放入”queue 的sw_ring中,
        这样,驱动收过来的包,就直接放在了sw_ring中。
        */
        for (i = 0; i < rxq->nb_rx_desc; i++) {
            volatile union ixgbe_adv_rx_desc *rxd;
            struct rte_mbuf *mbuf = rte_mbuf_raw_alloc(rxq->mb_pool);
    
            if (mbuf == NULL) {
                PMD_INIT_LOG(ERR, "RX mbuf alloc failed queue_id=%u",
                         (unsigned) rxq->queue_id);
                return -ENOMEM;
            }
    
            mbuf->data_off = RTE_PKTMBUF_HEADROOM;
            mbuf->port = rxq->port_id;
    
            dma_addr =
                rte_cpu_to_le_64(rte_mbuf_data_iova_default(mbuf));
            rxd = &rxq->rx_ring[i];
            rxd->read.hdr_addr = 0;
            rxd->read.pkt_addr = dma_addr;
            rxe[i].mbuf = mbuf;
        }
    
        return 0;
    }

    数据包的获取

    网卡收到报文后,先存于网卡本地的buffer-Rx(Rx FIFO)中,然后由DMA通过PCI总线将报文数据写入操作系统的内存中,即数据报文完成入队操作,那么数据包的获取就是指上层应用从队列中去取出这些数据包

    业务层面获取数据包是从rte_eth_rx_burst()开始:

    int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);

    这里的dev->rx_pkt_burst在驱动初始化的时候已经注册过了,对于ixgbe设备,就是ixgbe_recv_pkts()函数

    uint16_t
    ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
            uint16_t nb_pkts)
    {
        struct ixgbe_rx_queue *rxq;
        volatile union ixgbe_adv_rx_desc *rx_ring;
        volatile union ixgbe_adv_rx_desc *rxdp;
        struct ixgbe_rx_entry *sw_ring;
        struct ixgbe_rx_entry *rxe;
        struct rte_mbuf *rxm;
        struct rte_mbuf *nmb;
        union ixgbe_adv_rx_desc rxd;
        uint64_t dma_addr;
        uint32_t staterr;
        uint32_t pkt_info;
        uint16_t pkt_len;
        uint16_t rx_id;
        uint16_t nb_rx;
        uint16_t nb_hold;
        uint64_t pkt_flags;
        uint64_t vlan_flags;
    
        nb_rx = 0;
        nb_hold = 0;
        rxq = rx_queue;
        rx_id = rxq->rx_tail;//从队列的tail位置开始取包
        rx_ring = rxq->rx_ring;
        sw_ring = rxq->sw_ring;
        vlan_flags = rxq->vlan_flags;
        while (nb_rx < nb_pkts) {//循环获取nb_pkts个包
            /*
             * The order of operations here is important as the DD status
             * bit must not be read after any other descriptor fields.
             * rx_ring and rxdp are pointing to volatile data so the order
             * of accesses cannot be reordered by the compiler. If they were
             * not volatile, they could be reordered which could lead to
             * using invalid descriptor fields when read from rxd.
             */
            rxdp = &rx_ring[rx_id];
            staterr = rxdp->wb.upper.status_error;
        //检查DD位是否为1,是1则说明该位置已放入数据包,否则表示没有报文,退出
            if (!(staterr & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
                break;
            rxd = *rxdp;
    
            /*
             * End of packet.
             *
             * If the IXGBE_RXDADV_STAT_EOP flag is not set, the RX packet
             * is likely to be invalid and to be dropped by the various
             * validation checks performed by the network stack.
             *
             * Allocate a new mbuf to replenish the RX ring descriptor.
             * If the allocation fails:
             *    - arrange for that RX descriptor to be the first one
             *      being parsed the next time the receive function is
             *      invoked [on the same queue].
             *
             *    - Stop parsing the RX ring and return immediately.
             *
             * This policy do not drop the packet received in the RX
             * descriptor for which the allocation of a new mbuf failed.
             * Thus, it allows that packet to be later retrieved if
             * mbuf have been freed in the mean time.
             * As a side effect, holding RX descriptors instead of
             * systematically giving them back to the NIC may lead to
             * RX ring exhaustion situations.
             * However, the NIC can gracefully prevent such situations
             * to happen by sending specific "back-pressure" flow control
             * frames to its peer(s).
             */
            PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
                   "ext_err_stat=0x%08x pkt_len=%u",
                   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
                   (unsigned) rx_id, (unsigned) staterr,
                   (unsigned) rte_le_to_cpu_16(rxd.wb.upper.length));
            //申请一个mbuf(nmb),用于交换
            nmb = rte_mbuf_raw_alloc(rxq->mb_pool);
            if (nmb == NULL) {
                PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                       "queue_id=%u", (unsigned) rxq->port_id,
                       (unsigned) rxq->queue_id);
                rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
                break;
            }
    
            nb_hold++;
           
            rxe = &sw_ring[rx_id];
            rx_id++;
            if (rx_id == rxq->nb_rx_desc)
                rx_id = 0;
    
            /* Prefetch next mbuf while processing current one. */
            rte_ixgbe_prefetch(sw_ring[rx_id].mbuf);
    
            /*
             * When next RX descriptor is on a cache-line boundary,
             * prefetch the next 4 RX descriptors and the next 8 pointers
             * to mbufs.
             */
            if ((rx_id & 0x3) == 0) {
                rte_ixgbe_prefetch(&rx_ring[rx_id]);
                rte_ixgbe_prefetch(&sw_ring[rx_id]);
            }
            //从sw_ring中读取一个报文mbuf(存入rxm)
            rxm = rxe->mbuf;
             //往sw_ring中填空一个新报文mbuf(nmb)
            rxe->mbuf = nmb;
             //新mbuf对应的报文数据物理地址填入rx_ring对应位置,并将hdr_addr置0(DD位置0)
            dma_addr =
                rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
            rxdp->read.hdr_addr = 0;
            rxdp->read.pkt_addr = dma_addr;
    
            /*
             * Initialize the returned mbuf.
             * 1) setup generic mbuf fields:
             *    - number of segments,
             *    - next segment,
             *    - packet length,
             *    - RX port identifier.
             * 2) integrate hardware offload data, if any:
             *    - RSS flag & hash,
             *    - IP checksum flag,
             *    - VLAN TCI, if any,
             *    - error flags.
             */
            pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.wb.upper.length) -
                          rxq->crc_len);
            //对读取mbuf的报文信息进行初始化
            rxm->data_off = RTE_PKTMBUF_HEADROOM;
            rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);
            rxm->nb_segs = 1;
            rxm->next = NULL;
            rxm->pkt_len = pkt_len;
            rxm->data_len = pkt_len;
            rxm->port = rxq->port_id;
    
            pkt_info = rte_le_to_cpu_32(rxd.wb.lower.lo_dword.data);
            /* Only valid if PKT_RX_VLAN set in pkt_flags */
            rxm->vlan_tci = rte_le_to_cpu_16(rxd.wb.upper.vlan);
    
            pkt_flags = rx_desc_status_to_pkt_flags(staterr, vlan_flags);
            pkt_flags = pkt_flags | rx_desc_error_to_pkt_flags(staterr);
            pkt_flags = pkt_flags |
                ixgbe_rxd_pkt_info_to_pkt_flags((uint16_t)pkt_info);
            rxm->ol_flags = pkt_flags;
            rxm->packet_type =
                ixgbe_rxd_pkt_info_to_pkt_type(pkt_info,
                                   rxq->pkt_type_mask);
    
            if (likely(pkt_flags & PKT_RX_RSS_HASH))
                rxm->hash.rss = rte_le_to_cpu_32(
                            rxd.wb.lower.hi_dword.rss);
            else if (pkt_flags & PKT_RX_FDIR) {
                rxm->hash.fdir.hash = rte_le_to_cpu_16(
                        rxd.wb.lower.hi_dword.csum_ip.csum) &
                        IXGBE_ATR_HASH_MASK;
                rxm->hash.fdir.id = rte_le_to_cpu_16(
                        rxd.wb.lower.hi_dword.csum_ip.ip_id);
            }
            /*
             * Store the mbuf address into the next entry of the array
             * of returned packets.
             *///读取的报文mbuf存入rx_pkts
            rx_pkts[nb_rx++] = rxm;
        }
        rxq->rx_tail = rx_id;
    
        /*
         * If the number of free RX descriptors is greater than the RX free
         * threshold of the queue, advance the Receive Descriptor Tail (RDT)
         * register.
         * Update the RDT with the value of the last processed RX descriptor
         * minus 1, to guarantee that the RDT register is never equal to the
         * RDH register, which creates a "full" ring situtation from the
         * hardware point of view...
         */
        nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
        if (nb_hold > rxq->rx_free_thresh) {
            PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
                   "nb_hold=%u nb_rx=%u",
                   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
                   (unsigned) rx_id, (unsigned) nb_hold,
                   (unsigned) nb_rx);
            rx_id = (uint16_t) ((rx_id == 0) ?
                         (rxq->nb_rx_desc - 1) : (rx_id - 1));
            IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
            nb_hold = 0;
        }
        rxq->nb_rx_hold = nb_hold;
        return nb_rx;
    }
  • 相关阅读:
    FFmpeg命令:几种常见场景下的FFmpeg命令(摄像头采集推流,桌面屏幕录制推流、转流,拉流等等)
    javaCV入门指南:序章
    javacpp-FFmpeg系列补充:FFmpeg拉流截图实现在线演示demo(视频截图并返回base64图像,支持jpg/png/gif/bmp等多种格式)
    javacpp-FFmpeg系列之3: 像素图像数据转换(BGR与BufferdImage互转,RGB与BufferdImage互转,BufferdImage转Base64编码)
    javacpp-FFmpeg系列补充:FFmpeg解决avformat_find_stream_info检索时间过长问题
    javacpp-FFmpeg系列之2:通用拉流解码器,支持视频拉流解码并转换为YUV、BGR24或RGB24等图像像素数据
    Cache系列:spring-cache简单三步快速应用ehcache3.x-jcache缓存(spring4.x)
    运维程序】简单的命令控制器(支持定时命令执行、重复定时任务命令和进程管理,开发这个小程序主要是为了方便管理服务进程)【个人github项目】
    javacpp-FFmpeg系列之1:视频拉流解码成YUVJ420P,并保存为jpg图片
    spring-data详解之spring-data-jpa:简单三步快速上手spring-data-jpa开发
  • 原文地址:https://www.cnblogs.com/codestack/p/13821293.html
Copyright © 2011-2022 走看看