zoukankan      html  css  js  c++  java
  • dpdk l2fwd 应用流程分析

    int
    MAIN(int argc, char **argv)
    {
        struct lcore_queue_conf *qconf;
        struct rte_eth_dev_info dev_info;
        int ret;
        uint8_t nb_ports;
        uint8_t nb_ports_available;
        uint8_t portid, last_port;
        unsigned lcore_id, rx_lcore_id;
        unsigned nb_ports_in_mask = 0;
    
        /* init EAL */
        ret = rte_eal_init(argc, argv);
        if (ret < 0)
            rte_exit(EXIT_FAILURE, "Invalid EAL arguments
    ");
        argc -= ret;
        argv += ret;
    
        /* parse application arguments (after the EAL ones) */
        ret = l2fwd_parse_args(argc, argv);
        if (ret < 0)
            rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments
    ");
    
        /* create the mbuf pool */
        l2fwd_pktmbuf_pool =
            rte_mempool_create("mbuf_pool", NB_MBUF,
                       MBUF_SIZE, 32,
                       sizeof(struct rte_pktmbuf_pool_private),
                       rte_pktmbuf_pool_init, NULL,
                       rte_pktmbuf_init, NULL,
                       rte_socket_id(), 0);
        if (l2fwd_pktmbuf_pool == NULL)
            rte_exit(EXIT_FAILURE, "Cannot init mbuf pool
    ");
    
        /* init driver(s) */
        if (rte_pmd_init_all() < 0)
            rte_exit(EXIT_FAILURE, "Cannot init pmd
    ");
    
        if (rte_eal_pci_probe() < 0)
            rte_exit(EXIT_FAILURE, "Cannot probe PCI
    ");
    
        nb_ports = rte_eth_dev_count();
        if (nb_ports == 0)
            rte_exit(EXIT_FAILURE, "No Ethernet ports - bye
    ");
    
        if (nb_ports > RTE_MAX_ETHPORTS)
            nb_ports = RTE_MAX_ETHPORTS;
    
        /* reset l2fwd_dst_ports */
        for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++)
            l2fwd_dst_ports[portid] = 0;
        last_port = 0;
    
        /* port0发给port1, port1发给port0. 两个端口为一对,互相发包 */
        /*
         * Each logical core is assigned a dedicated TX queue on each port.
         */
        for (portid = 0; portid < nb_ports; portid++) {
            /* skip ports that are not enabled */
            if ((l2fwd_enabled_port_mask & (1 << portid)) == 0)
                continue;
    
            if (nb_ports_in_mask % 2) {
                l2fwd_dst_ports[portid] = last_port;
                l2fwd_dst_ports[last_port] = portid;
            }
            else
                last_port = portid;
    
            nb_ports_in_mask++;
    
            rte_eth_dev_info_get(portid, &dev_info);
        }
        if (nb_ports_in_mask % 2) {
            printf("Notice: odd number of ports in portmask.
    ");
            l2fwd_dst_ports[last_port] = last_port;
        }
    
        rx_lcore_id = 0;
        qconf = NULL;
    
        /* 每一个core负责收l2fwd_rx_queue_per_lcore个端口, 每一个端口(事实上应该是QUEUE,由于这里一个port仅仅有一个QUEUE)仅仅能由一个lcore进行收包 */
        /* Initialize the port/queue configuration of each logical core */
        for (portid = 0; portid < nb_ports; portid++) {
            /* skip ports that are not enabled */
            if ((l2fwd_enabled_port_mask & (1 << portid)) == 0)
                continue;
    
            /* get the lcore_id for this port */
            while (rte_lcore_is_enabled(rx_lcore_id) == 0 ||
                   lcore_queue_conf[rx_lcore_id].n_rx_port ==
                   l2fwd_rx_queue_per_lcore) {
                rx_lcore_id++;
                if (rx_lcore_id >= RTE_MAX_LCORE)
                    rte_exit(EXIT_FAILURE, "Not enough cores
    ");
            }
    
            if (qconf != &lcore_queue_conf[rx_lcore_id])
                /* Assigned a new logical core in the loop above. */
                qconf = &lcore_queue_conf[rx_lcore_id];
    
            qconf->rx_port_list[qconf->n_rx_port] = portid;
            qconf->n_rx_port++;
            printf("Lcore %u: RX port %u
    ", rx_lcore_id, (unsigned) portid);
        }
    
        nb_ports_available = nb_ports;
    
        /* 每一个port收发包队列的初始化 */
        /* Initialise each port */
        for (portid = 0; portid < nb_ports; portid++) {
            /* skip ports that are not enabled */
            if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) {
                printf("Skipping disabled port %u
    ", (unsigned) portid);
                nb_ports_available--;
                continue;
            }
            /* init port */
            printf("Initializing port %u... ", (unsigned) portid);
            fflush(stdout);
            ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
            if (ret < 0)
                rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u
    ",
                      ret, (unsigned) portid);
    
            rte_eth_macaddr_get(portid,&l2fwd_ports_eth_addr[portid]);
    
            /* init one RX queue */
            fflush(stdout);
            ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
                             rte_eth_dev_socket_id(portid), &rx_conf,
                             l2fwd_pktmbuf_pool);
            if (ret < 0)
                rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u
    ",
                      ret, (unsigned) portid);
    
            /* init one TX queue on each port */
            fflush(stdout);
            ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
                    rte_eth_dev_socket_id(portid), &tx_conf);
            if (ret < 0)
                rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u
    ",
                    ret, (unsigned) portid);
    
            /* Start device */
            ret = rte_eth_dev_start(portid);
            if (ret < 0)
                rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u
    ",
                      ret, (unsigned) portid);
    
            printf("done: 
    ");
    
            rte_eth_promiscuous_enable(portid);
    
            printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X
    
    ",
                    (unsigned) portid,
                    l2fwd_ports_eth_addr[portid].addr_bytes[0],
                    l2fwd_ports_eth_addr[portid].addr_bytes[1],
                    l2fwd_ports_eth_addr[portid].addr_bytes[2],
                    l2fwd_ports_eth_addr[portid].addr_bytes[3],
                    l2fwd_ports_eth_addr[portid].addr_bytes[4],
                    l2fwd_ports_eth_addr[portid].addr_bytes[5]);
    
            /* initialize port stats */
            memset(&port_statistics, 0, sizeof(port_statistics));
        }
    
        if (!nb_ports_available) {
            rte_exit(EXIT_FAILURE,
                "All available ports are disabled. Please set portmask.
    ");
        }
    
        check_all_ports_link_status(nb_ports, l2fwd_enabled_port_mask);
    
        /* 启动l2fwd线程 */
        /* launch per-lcore init on every lcore */
        rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);
        RTE_LCORE_FOREACH_SLAVE(lcore_id) {
            if (rte_eal_wait_lcore(lcore_id) < 0)
                return -1;
        }
    
        return 0;
    }


    下面具体分析port初始化过程; 对于每一个port, 首先调用rte_eth_dev_configure配置port的收发包队列个数,并初始化收发包队列控制块;

    int
    rte_eth_dev_configure(uint8_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,
                  const struct rte_eth_conf *dev_conf)
    {
        struct rte_eth_dev *dev;
        struct rte_eth_dev_info dev_info;
        int diag;
    
        /* 仅仅能由primary进程初始化 */
        /* This function is only safe when called from the primary process
         * in a multi-process setup*/
        PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);
    
        if (port_id >= nb_ports || port_id >= RTE_MAX_ETHPORTS) {
            PMD_DEBUG_TRACE("Invalid port_id=%d
    ", port_id);
            return (-EINVAL);
        }
        dev = &rte_eth_devices[port_id];
    
        /* 在PMD驱动初始化过程中,E1000的ops注冊为eth_em_ops */
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP);
    
        /* rte_eth_dev_start会把该标记为置为1 */
        if (dev->data->dev_started) {
            PMD_DEBUG_TRACE(
                "port %d must be stopped to allow configuration
    ", port_id);
            return (-EBUSY);
        }
    
        /* eth_em_infos_get会返回tx,rx队列数; 本样例max_rx_queues = 1 max_tx_queues = 1 */
        /*
         * Check that the numbers of RX and TX queues are not greater
         * than the maximum number of RX and TX queues supported by the
         * configured device.
         */
        (*dev->dev_ops->dev_infos_get)(dev, &dev_info);
        if (nb_rx_q > dev_info.max_rx_queues) {
            PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_queues=%d > %d
    ",
                    port_id, nb_rx_q, dev_info.max_rx_queues);
            return (-EINVAL);
        }
        if (nb_rx_q == 0) {
            PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_q == 0
    ", port_id);
            return (-EINVAL);
        }
    
        if (nb_tx_q > dev_info.max_tx_queues) {
            PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_queues=%d > %d
    ",
                    port_id, nb_tx_q, dev_info.max_tx_queues);
            return (-EINVAL);
        }
        if (nb_tx_q == 0) {
            PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_q == 0
    ", port_id);
            return (-EINVAL);
        }
    
        /* dev_conf里面是tx,rx模式的配置 */
        /* Copy the dev_conf parameter into the dev structure */
        memcpy(&dev->data->dev_conf, dev_conf, sizeof(dev->data->dev_conf));
    
        /* 是否收大报文 一般不须要 */
        /*
         * If jumbo frames are enabled, check that the maximum RX packet
         * length is supported by the configured device.
         */
        if (dev_conf->rxmode.jumbo_frame == 1) {
            if (dev_conf->rxmode.max_rx_pkt_len >
                dev_info.max_rx_pktlen) {
                PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u"
                    " > max valid value %u
    ",
                    port_id,
                    (unsigned)dev_conf->rxmode.max_rx_pkt_len,
                    (unsigned)dev_info.max_rx_pktlen);
                return (-EINVAL);
            }
            else if (dev_conf->rxmode.max_rx_pkt_len < ETHER_MIN_LEN) {
                PMD_DEBUG_TRACE("ethdev port_id=%d max_rx_pkt_len %u"
                    " < min valid value %u
    ",
                    port_id,
                    (unsigned)dev_conf->rxmode.max_rx_pkt_len,
                    (unsigned)ETHER_MIN_LEN);
                return (-EINVAL);
            }
        } else
            /* Use default value */
            dev->data->dev_conf.rxmode.max_rx_pkt_len = ETHER_MAX_LEN;
    
        /* 多队列的检查, 当中各种模式DCB/RSS表示什么意思? */
        /* multipe queue mode checking */
        diag = rte_eth_dev_check_mq_mode(port_id, nb_rx_q, nb_tx_q, dev_conf);
        if (diag != 0) {
            PMD_DEBUG_TRACE("port%d rte_eth_dev_check_mq_mode = %d
    ",
                    port_id, diag);
            return diag;
        }
    
        /*
         * Setup new number of RX/TX queues and reconfigure device.
         */
        /* RX队列控制块内存分配 */
        diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q);
        if (diag != 0) {
            PMD_DEBUG_TRACE("port%d rte_eth_dev_rx_queue_config = %d
    ",
                    port_id, diag);
            return diag;
        }
    
        /* TX队列控制块内存分配 */
        diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q);
        if (diag != 0) {
            PMD_DEBUG_TRACE("port%d rte_eth_dev_tx_queue_config = %d
    ",
                    port_id, diag);
            rte_eth_dev_rx_queue_config(dev, 0);
            return diag;
        }
    
        /* eth_em_configure, 标记intr->flags |= E1000_FLAG_NEED_LINK_UPDATE; */
        diag = (*dev->dev_ops->dev_configure)(dev);
        if (diag != 0) {
            PMD_DEBUG_TRACE("port%d dev_configure = %d
    ",
                    port_id, diag);
            rte_eth_dev_rx_queue_config(dev, 0);
            rte_eth_dev_tx_queue_config(dev, 0);
            return diag;
        }
    
        return 0;
    }


    RX queue setup


    int
    rte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id,
                   uint16_t nb_rx_desc, unsigned int socket_id,
                   const struct rte_eth_rxconf *rx_conf,
                   struct rte_mempool *mp)
    {
        struct rte_eth_dev *dev;
        struct rte_pktmbuf_pool_private *mbp_priv;
        struct rte_eth_dev_info dev_info;
    
        /* This function is only safe when called from the primary process
         * in a multi-process setup*/
        PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);
    
        if (port_id >= nb_ports) {
            PMD_DEBUG_TRACE("Invalid port_id=%d
    ", port_id);
            return (-EINVAL);
        }
        dev = &rte_eth_devices[port_id];
        if (rx_queue_id >= dev->data->nb_rx_queues) {
            PMD_DEBUG_TRACE("Invalid RX queue_id=%d
    ", rx_queue_id);
            return (-EINVAL);
        }
    
        if (dev->data->dev_started) {
            PMD_DEBUG_TRACE(
                "port %d must be stopped to allow configuration
    ", port_id);
            return -EBUSY;
        }
    
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_setup, -ENOTSUP);
    
        /*
         * Check the size of the mbuf data buffer.
         * This value must be provided in the private data of the memory pool.
         * First check that the memory pool has a valid private data.
         */
        (*dev->dev_ops->dev_infos_get)(dev, &dev_info);
        if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) {
            PMD_DEBUG_TRACE("%s private_data_size %d < %d
    ",
                    mp->name, (int) mp->private_data_size,
                    (int) sizeof(struct rte_pktmbuf_pool_private));
            return (-ENOSPC);
        }
    
        /* mbuf data部分大小(2048) > 256 */
        mbp_priv = rte_mempool_get_priv(mp);
        if ((uint32_t) (mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM) <
            dev_info.min_rx_bufsize) {
            PMD_DEBUG_TRACE("%s mbuf_data_room_size %d < %d "
                    "(RTE_PKTMBUF_HEADROOM=%d + min_rx_bufsize(dev)"
                    "=%d)
    ",
                    mp->name,
                    (int)mbp_priv->mbuf_data_room_size,
                    (int)(RTE_PKTMBUF_HEADROOM +
                          dev_info.min_rx_bufsize),
                    (int)RTE_PKTMBUF_HEADROOM,
                    (int)dev_info.min_rx_bufsize);
            return (-EINVAL);
        }
    
        /* eth_em_rx_queue_setup, 初始化收包描写叙述符 */
        return (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
                               socket_id, rx_conf, mp);
    }



    int
    rte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id,
                   uint16_t nb_rx_desc, unsigned int socket_id,
                   const struct rte_eth_rxconf *rx_conf,
                   struct rte_mempool *mp)
    {
        struct rte_eth_dev *dev;
        struct rte_pktmbuf_pool_private *mbp_priv;
        struct rte_eth_dev_info dev_info;
    
        /* This function is only safe when called from the primary process
         * in a multi-process setup*/
        PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);
    
        if (port_id >= nb_ports) {
            PMD_DEBUG_TRACE("Invalid port_id=%d
    ", port_id);
            return (-EINVAL);
        }
        dev = &rte_eth_devices[port_id];
        if (rx_queue_id >= dev->data->nb_rx_queues) {
            PMD_DEBUG_TRACE("Invalid RX queue_id=%d
    ", rx_queue_id);
            return (-EINVAL);
        }
    
        if (dev->data->dev_started) {
            PMD_DEBUG_TRACE(
                "port %d must be stopped to allow configuration
    ", port_id);
            return -EBUSY;
        }
    
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_setup, -ENOTSUP);
    
        /*
         * Check the size of the mbuf data buffer.
         * This value must be provided in the private data of the memory pool.
         * First check that the memory pool has a valid private data.
         */
        (*dev->dev_ops->dev_infos_get)(dev, &dev_info);
        if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) {
            PMD_DEBUG_TRACE("%s private_data_size %d < %d
    ",
                    mp->name, (int) mp->private_data_size,
                    (int) sizeof(struct rte_pktmbuf_pool_private));
            return (-ENOSPC);
        }
    
        /* mbuf data部分大小(2048) > 256 */
        mbp_priv = rte_mempool_get_priv(mp);
        if ((uint32_t) (mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM) <
            dev_info.min_rx_bufsize) {
            PMD_DEBUG_TRACE("%s mbuf_data_room_size %d < %d "
                    "(RTE_PKTMBUF_HEADROOM=%d + min_rx_bufsize(dev)"
                    "=%d)
    ",
                    mp->name,
                    (int)mbp_priv->mbuf_data_room_size,
                    (int)(RTE_PKTMBUF_HEADROOM +
                          dev_info.min_rx_bufsize),
                    (int)RTE_PKTMBUF_HEADROOM,
                    (int)dev_info.min_rx_bufsize);
            return (-EINVAL);
        }
    
        /* eth_em_rx_queue_setup, 初始化收包描写叙述符 */
        return (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
                               socket_id, rx_conf, mp);
    }


    TX queue setup


    int
    rte_eth_tx_queue_setup(uint8_t port_id, uint16_t tx_queue_id,
                   uint16_t nb_tx_desc, unsigned int socket_id,
                   const struct rte_eth_txconf *tx_conf)
    {
        struct rte_eth_dev *dev;
    
        /* This function is only safe when called from the primary process
         * in a multi-process setup*/
        PROC_PRIMARY_OR_ERR_RET(-E_RTE_SECONDARY);
    
        if (port_id >= RTE_MAX_ETHPORTS || port_id >= nb_ports) {
            PMD_DEBUG_TRACE("Invalid port_id=%d
    ", port_id);
            return (-EINVAL);
        }
        dev = &rte_eth_devices[port_id];
        if (tx_queue_id >= dev->data->nb_tx_queues) {
            PMD_DEBUG_TRACE("Invalid TX queue_id=%d
    ", tx_queue_id);
            return (-EINVAL);
        }
    
        /* 必须在设备启动前做初始化操作 */
        if (dev->data->dev_started) {
            PMD_DEBUG_TRACE(
                "port %d must be stopped to allow configuration
    ", port_id);
            return -EBUSY;
        }
    
        /* 调用PMD驱动的tx_queue_setup */
        FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_setup, -ENOTSUP);
        return (*dev->dev_ops->tx_queue_setup)(dev, tx_queue_id, nb_tx_desc,
                               socket_id, tx_conf);
    }


    int
    eth_em_tx_queue_setup(struct rte_eth_dev *dev,
                 uint16_t queue_idx,
                 uint16_t nb_desc,
                 unsigned int socket_id,
                 const struct rte_eth_txconf *tx_conf)
    {
        const struct rte_memzone *tz;
        struct em_tx_queue *txq;
        struct e1000_hw     *hw;
        uint32_t tsize;
        uint16_t tx_rs_thresh, tx_free_thresh;
    
        hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);
    
        /* tx descriptor必须是cache line对齐的 */
        /*
         * Validate number of transmit descriptors.
         * It must not exceed hardware maximum, and must be multiple
         * of EM_ALIGN.
         */
        if (((nb_desc * sizeof(*txq->tx_ring)) % EM_ALIGN) != 0 ||
                (nb_desc > EM_MAX_RING_DESC) ||
                (nb_desc < EM_MIN_RING_DESC)) {
            return -(EINVAL);
        }
    
        /* threshold 配置 */
        tx_free_thresh = tx_conf->tx_free_thresh;
        if (tx_free_thresh == 0)
            tx_free_thresh = (uint16_t)RTE_MIN(nb_desc / 4,
                        DEFAULT_TX_FREE_THRESH);
    
        tx_rs_thresh = tx_conf->tx_rs_thresh;
        if (tx_rs_thresh == 0)
            tx_rs_thresh = (uint16_t)RTE_MIN(tx_free_thresh,
                        DEFAULT_TX_RS_THRESH);
    
        if (tx_free_thresh >= (nb_desc - 3)) {
            RTE_LOG(ERR, PMD, "tx_free_thresh must be less than the "
                "number of TX descriptors minus 3. (tx_free_thresh=%u "
                "port=%d queue=%d)
    ", (unsigned int)tx_free_thresh,
                    (int)dev->data->port_id, (int)queue_idx);
            return -(EINVAL);
        }
        if (tx_rs_thresh > tx_free_thresh) {
            RTE_LOG(ERR, PMD, "tx_rs_thresh must be less than or equal to "
                "tx_free_thresh. (tx_free_thresh=%u tx_rs_thresh=%u "
                "port=%d queue=%d)
    ", (unsigned int)tx_free_thresh,
                (unsigned int)tx_rs_thresh, (int)dev->data->port_id,
                                (int)queue_idx);
            return -(EINVAL);
        }
    
        /*
         * If rs_bit_thresh is greater than 1, then TX WTHRESH should be
         * set to 0. If WTHRESH is greater than zero, the RS bit is ignored
         * by the NIC and all descriptors are written back after the NIC
         * accumulates WTHRESH descriptors.
         */
        if (tx_conf->tx_thresh.wthresh != 0 && tx_rs_thresh != 1) {
            RTE_LOG(ERR, PMD, "TX WTHRESH must be set to 0 if "
                "tx_rs_thresh is greater than 1. (tx_rs_thresh=%u "
                "port=%d queue=%d)
    ", (unsigned int)tx_rs_thresh,
                    (int)dev->data->port_id, (int)queue_idx);
            return -(EINVAL);
        }
    
        /* txq不为空,释放原先的队列中的mbuf和txq */
        /* Free memory prior to re-allocation if needed... */
        if (dev->data->tx_queues[queue_idx] != NULL) {
            em_tx_queue_release(dev->data->tx_queues[queue_idx]);
            dev->data->tx_queues[queue_idx] = NULL;
        }
    
        /* 分配名为rte_em_pmd_tx_ring_p_q的memzone, 用于存放EM_MAX_RING_DESC个tx descriptor */
        /*
         * Allocate TX ring hardware descriptors. A memzone large enough to
         * handle the maximum ring size is allocated in order to allow for
         * resizing in later calls to the queue setup function.
         */
        tsize = sizeof (txq->tx_ring[0]) * EM_MAX_RING_DESC;
        if ((tz = ring_dma_zone_reserve(dev, "tx_ring", queue_idx, tsize,
                socket_id)) == NULL)
            return (-ENOMEM);
    
        /* txq内存分配 */
        /* Allocate the tx queue data structure. */
        if ((txq = rte_zmalloc("ethdev TX queue", sizeof(*txq),
                CACHE_LINE_SIZE)) == NULL)
            return (-ENOMEM);
    
        /* txq sw_ring内存分配 */
        /* Allocate software ring */
        if ((txq->sw_ring = rte_zmalloc("txq->sw_ring",
                sizeof(txq->sw_ring[0]) * nb_desc,
                CACHE_LINE_SIZE)) == NULL) {
            em_tx_queue_release(txq);
            return (-ENOMEM);
        }
    
        txq->nb_tx_desc = nb_desc;
        txq->tx_free_thresh = tx_free_thresh;
        txq->tx_rs_thresh = tx_rs_thresh;
        txq->pthresh = tx_conf->tx_thresh.pthresh;
        txq->hthresh = tx_conf->tx_thresh.hthresh;
        txq->wthresh = tx_conf->tx_thresh.wthresh;
        txq->queue_id = queue_idx;
        txq->port_id = dev->data->port_id;
    
        txq->tdt_reg_addr = E1000_PCI_REG_ADDR(hw, E1000_TDT(queue_idx));
    
        /* tx_ring的物理地址 */
    #ifndef RTE_LIBRTE_XEN_DOM0
        txq->tx_ring_phys_addr = (uint64_t) tz->phys_addr;
    #else   
        txq->tx_ring_phys_addr = rte_mem_phy2mch(tz->memseg_id, tz->phys_addr);
    #endif
        /* tx_ring的虚拟地址 */
        txq->tx_ring = (struct e1000_data_desc *) tz->addr;
    
        PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64"
    ",
            txq->sw_ring, txq->tx_ring, txq->tx_ring_phys_addr);
    
        /* 环状队列初始化,每一个entry的next指向下一个,最后一个指向第一个 */
        em_reset_tx_queue(txq);
    
        dev->data->tx_queues[queue_idx] = txq;
        return (0);
    }


    port初始化的最后一步是使能port收发包功能,当中主要是通知E1000驱动tx ring和rx ring的地址, 细节就不再跟进

    void
    eth_em_tx_init(struct rte_eth_dev *dev)
    {
        struct e1000_hw     *hw;
        struct em_tx_queue *txq;
        uint32_t tctl;
        uint32_t txdctl;
        uint16_t i;
    
        hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);
    
        /* 把每个queue的tx ring的物理地址通告给E1000驱动 */
        /* Setup the Base and Length of the Tx Descriptor Rings. */
        for (i = 0; i < dev->data->nb_tx_queues; i++) {
            uint64_t bus_addr;
    
            txq = dev->data->tx_queues[i];
            bus_addr = txq->tx_ring_phys_addr;
            E1000_WRITE_REG(hw, E1000_TDLEN(i),
                    txq->nb_tx_desc *
                    sizeof(*txq->tx_ring));
            E1000_WRITE_REG(hw, E1000_TDBAH(i),
                    (uint32_t)(bus_addr >> 32));
            E1000_WRITE_REG(hw, E1000_TDBAL(i), (uint32_t)bus_addr);
    
            /* Setup the HW Tx Head and Tail descriptor pointers. */
            E1000_WRITE_REG(hw, E1000_TDT(i), 0);
            E1000_WRITE_REG(hw, E1000_TDH(i), 0);
    
            /* Setup Transmit threshold registers. */
            txdctl = E1000_READ_REG(hw, E1000_TXDCTL(i));
            /*
             * bit 22 is reserved, on some models should always be 0,
             * on others  - always 1.
             */
            txdctl &= E1000_TXDCTL_COUNT_DESC;
            txdctl |= txq->pthresh & 0x3F;
            txdctl |= (txq->hthresh & 0x3F) << 8;
            txdctl |= (txq->wthresh & 0x3F) << 16;
            txdctl |= E1000_TXDCTL_GRAN;
            E1000_WRITE_REG(hw, E1000_TXDCTL(i), txdctl);
        }
    
        /* Program the Transmit Control Register. */
        tctl = E1000_READ_REG(hw, E1000_TCTL);
        tctl &= ~E1000_TCTL_CT;
        tctl |= (E1000_TCTL_PSP | E1000_TCTL_RTLC | E1000_TCTL_EN |
             (E1000_COLLISION_THRESHOLD << E1000_CT_SHIFT));
    
        /* This write will effectively turn on the transmit unit. */
        E1000_WRITE_REG(hw, E1000_TCTL, tctl);
    }


    int
    eth_em_rx_init(struct rte_eth_dev *dev)
    {
        struct e1000_hw *hw;
        struct em_rx_queue *rxq;
        uint32_t rctl;
        uint32_t rfctl;
        uint32_t rxcsum;
        uint32_t rctl_bsize;
        uint16_t i;
        int ret;
    
        hw = E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);
    
        /*
         * Make sure receives are disabled while setting
         * up the descriptor ring.
         */
        rctl = E1000_READ_REG(hw, E1000_RCTL);
        E1000_WRITE_REG(hw, E1000_RCTL, rctl & ~E1000_RCTL_EN);
    
        rfctl = E1000_READ_REG(hw, E1000_RFCTL);
    
        /* Disable extended descriptor type. */
        rfctl &= ~E1000_RFCTL_EXTEN;
        /* Disable accelerated acknowledge */
        if (hw->mac.type == e1000_82574)
            rfctl |= E1000_RFCTL_ACK_DIS;
    
        E1000_WRITE_REG(hw, E1000_RFCTL, rfctl);
    
        /*
         * XXX TEMPORARY WORKAROUND: on some systems with 82573
         * long latencies are observed, like Lenovo X60. This
         * change eliminates the problem, but since having positive
         * values in RDTR is a known source of problems on other
         * platforms another solution is being sought.
         */
        if (hw->mac.type == e1000_82573)
            E1000_WRITE_REG(hw, E1000_RDTR, 0x20);
    
        dev->rx_pkt_burst = (eth_rx_burst_t)eth_em_recv_pkts;
    
        /* 计算pkt buf的大小 */
        /* Determine RX bufsize. */
        rctl_bsize = EM_MAX_BUF_SIZE;
        for (i = 0; i < dev->data->nb_rx_queues; i++) {
            struct rte_pktmbuf_pool_private *mbp_priv;
            uint32_t buf_size;
    
            rxq = dev->data->rx_queues[i];
            mbp_priv = rte_mempool_get_priv(rxq->mb_pool);
            buf_size = mbp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM;
            rctl_bsize = RTE_MIN(rctl_bsize, buf_size);
        }
    
        rctl |= em_rctl_bsize(hw->mac.type, &rctl_bsize);
    
        /* Configure and enable each RX queue. */
        for (i = 0; i < dev->data->nb_rx_queues; i++) {
            uint64_t bus_addr;
            uint32_t rxdctl;
    
            rxq = dev->data->rx_queues[i];
    
            /* 从mbuf pool中分配mbuf, 填写到rxq->sw_ring,记录每一个pkt buf的物理地址到rxq->rx_ring */
            /* Allocate buffers for descriptor rings and setup queue */
            ret = em_alloc_rx_queue_mbufs(rxq);
            if (ret)
                return ret;
    
            /* 把rx ring的物理地址通告给E1000驱动 */
    
            /*
             * Reset crc_len in case it was changed after queue setup by a
             *  call to configure
             */
            rxq->crc_len =
                (uint8_t)(dev->data->dev_conf.rxmode.hw_strip_crc ?
    : ETHER_CRC_LEN);
    
            bus_addr = rxq->rx_ring_phys_addr;
            E1000_WRITE_REG(hw, E1000_RDLEN(i),
                    rxq->nb_rx_desc *
                    sizeof(*rxq->rx_ring));
            E1000_WRITE_REG(hw, E1000_RDBAH(i),
                    (uint32_t)(bus_addr >> 32));
            E1000_WRITE_REG(hw, E1000_RDBAL(i), (uint32_t)bus_addr);
    
            E1000_WRITE_REG(hw, E1000_RDH(i), 0);
            E1000_WRITE_REG(hw, E1000_RDT(i), rxq->nb_rx_desc - 1);
    
            rxdctl = E1000_READ_REG(hw, E1000_RXDCTL(0));
            rxdctl &= 0xFE000000;
            rxdctl |= rxq->pthresh & 0x3F;
            rxdctl |= (rxq->hthresh & 0x3F) << 8;
            rxdctl |= (rxq->wthresh & 0x3F) << 16;
            rxdctl |= E1000_RXDCTL_GRAN;
            E1000_WRITE_REG(hw, E1000_RXDCTL(i), rxdctl);
    
            /* 收大报文用的收包函数 */
            /*
             * Due to EM devices not having any sort of hardware
             * limit for packet length, jumbo frame of any size
             * can be accepted, thus we have to enable scattered
             * rx if jumbo frames are enabled (or if buffer size
             * is too small to accomodate non-jumbo packets)
             * to avoid splitting packets that don't fit into
             * one buffer.
             */
            if (dev->data->dev_conf.rxmode.jumbo_frame ||
                    rctl_bsize < ETHER_MAX_LEN) {
                dev->rx_pkt_burst =
                    (eth_rx_burst_t)eth_em_recv_scattered_pkts;
                dev->data->scattered_rx = 1;
            }
        }
    
        /* 下面省略 */
        ...
    
        return 0;
    }


    到此port初始化完毕,比启动,回到main函数中, 在每一个lcore上启动循环收包函数

    /* launch per-lcore init on every lcore */
    rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER);

    lcore的主线程处理例如以下


    /* main processing loop */
    static void
    l2fwd_main_loop(void)
    {
        struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
        struct rte_mbuf *m;
        unsigned lcore_id;
        uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc;
        unsigned i, j, portid, nb_rx;
        struct lcore_queue_conf *qconf;
        const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
    
        prev_tsc = 0;
        timer_tsc = 0;
    
        lcore_id = rte_lcore_id();
        qconf = &lcore_queue_conf[lcore_id];
    
        if (qconf->n_rx_port == 0) {
            RTE_LOG(INFO, L2FWD, "lcore %u has nothing to do
    ", lcore_id);
            return;
        }
    
        RTE_LOG(INFO, L2FWD, "entering main loop on lcore %u
    ", lcore_id);
    
        /* 当前lcore须要处理哪些port(queue) */
        for (i = 0; i < qconf->n_rx_port; i++) {
    
            portid = qconf->rx_port_list[i];
            RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u
    ", lcore_id,
                portid);
        }
    
        while (1) {
    
            cur_tsc = rte_rdtsc();
    
            /*
             * TX burst queue drain
             */
            diff_tsc = cur_tsc - prev_tsc;
    
            /* 隔一段时间才把全部要发送的报文发送出去并打印统计信息 */
            if (unlikely(diff_tsc > drain_tsc)) {
    
                for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) {
                    /* 当前port没有须要发送的报文 */
                    if (qconf->tx_mbufs[portid].len == 0)
                        continue;
    
                    /* 调用device的发包函数并统计发送的报文个数 */
                    l2fwd_send_burst(&lcore_queue_conf[lcore_id],
                             qconf->tx_mbufs[portid].len,
                             (uint8_t) portid);
    
                    /* 到此应该当前端口须要发送的报文全部发送,因此len置为0 */
                    qconf->tx_mbufs[portid].len = 0;
                }
    
                /* if timer is enabled */
                if (timer_period > 0) {
    
                    /* advance the timer */
                    timer_tsc += diff_tsc;
    
                    /* if timer has reached its timeout */
                    if (unlikely(timer_tsc >= (uint64_t) timer_period)) {
    
                        /* do this only on master core */
                        if (lcore_id == rte_get_master_lcore()) {
                            print_stats();
                            /* reset the timer */
                            timer_tsc = 0;
                        }
                    }
                }
    
                prev_tsc = cur_tsc;
            }
    
            /* 当前lcore须要处理的queue */
            /*
             * Read packet from RX queues
             */
            for (i = 0; i < qconf->n_rx_port; i++) {
    
                portid = qconf->rx_port_list[i];
    
                /* 当前port仅仅有queue0 */
                nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
                             pkts_burst, MAX_PKT_BURST);
    
                /* 更新收包统计 */
                port_statistics[portid].rx += nb_rx;
    
                /* 把全部收上来的报文改动目的MAC后增加到发包队列 */
                for (j = 0; j < nb_rx; j++) {
                    m = pkts_burst[j];
    
                    /* PKT DATA部分加载cache,这个好像收包部分已经prefetch过了 */
                    rte_prefetch0(rte_pktmbuf_mtod(m, void *));
    
                    /* forword */
                    l2fwd_simple_forward(m, portid);
                }
            }
        }
    }



    首先看报文是怎样收上来的, 调用device的rx_pkt_burst


    static inline uint16_t
    rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
             struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
    {
        struct rte_eth_dev *dev;
    
        dev = &rte_eth_devices[port_id];
        return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);
    }


    PMD的收包函数例如以下:

    uint16_t
    eth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
            uint16_t nb_pkts)
    {
        /* volatile防止编译器优化,每次使用必须又一次从memory中取而不是用寄存器的值 */
        volatile struct e1000_rx_desc *rx_ring;
        volatile struct e1000_rx_desc *rxdp;
        struct em_rx_queue *rxq;
        struct em_rx_entry *sw_ring;
        struct em_rx_entry *rxe;
        struct rte_mbuf *rxm;
        struct rte_mbuf *nmb;
        struct e1000_rx_desc rxd;
        uint64_t dma_addr;
        uint16_t pkt_len;
        uint16_t rx_id;
        uint16_t nb_rx;
        uint16_t nb_hold;
        uint8_t status;
    
        rxq = rx_queue;
    
        nb_rx = 0;
        nb_hold = 0;
        rx_id = rxq->rx_tail;       /* 当前收包位置 */
        rx_ring = rxq->rx_ring;     /* rx descriptor */
        sw_ring = rxq->sw_ring;     /* mbuf */
    
        /* 一次性收32个报文 */
        while (nb_rx < nb_pkts) {
            /*
             * The order of operations here is important as the DD status
             * bit must not be read after any other descriptor fields.
             * rx_ring and rxdp are pointing to volatile data so the order
             * of accesses cannot be reordered by the compiler. If they were
             * not volatile, they could be reordered which could lead to
             * using invalid descriptor fields when read from rxd.
             */
            
            /* 当前报文的descriptor */
            rxdp = &rx_ring[rx_id];
    
            /* 结束标记,必须首先读取 */
            status = rxdp->status;
            if (! (status & E1000_RXD_STAT_DD))
                break;
    
            /* 复制一份 */
            rxd = *rxdp;
    
            /*
             * End of packet.
             *
             * If the E1000_RXD_STAT_EOP flag is not set, the RX packet is
             * likely to be invalid and to be dropped by the various
             * validation checks performed by the network stack.
             *
             * Allocate a new mbuf to replenish the RX ring descriptor.
             * If the allocation fails:
             *    - arrange for that RX descriptor to be the first one
             *      being parsed the next time the receive function is
             *      invoked [on the same queue].
             *
             *    - Stop parsing the RX ring and return immediately.
             *
             * This policy do not drop the packet received in the RX
             * descriptor for which the allocation of a new mbuf failed.
             * Thus, it allows that packet to be later retrieved if
             * mbuf have been freed in the mean time.
             * As a side effect, holding RX descriptors instead of
             * systematically giving them back to the NIC may lead to
             * RX ring exhaustion situations.
             * However, the NIC can gracefully prevent such situations
             * to happen by sending specific "back-pressure" flow control
             * frames to its peer(s).
             */
            PMD_RX_LOG(DEBUG, "
    port_id=%u queue_id=%u rx_id=%u "
                "status=0x%x pkt_len=%u
    ",
                (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
                (unsigned) rx_id, (unsigned) status,
                (unsigned) rte_le_to_cpu_16(rxd.length));
    
            /* 分配新的mbuf给驱动 */
            nmb = rte_rxmbuf_alloc(rxq->mb_pool);
            if (nmb == NULL) {
                PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                    "queue_id=%u
    ",
                    (unsigned) rxq->port_id,
                    (unsigned) rxq->queue_id);
                rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
                break;
            }
    
            /* 表示当前descriptor被上层软件占用 */
            nb_hold++;
    
            /* 当前收到的mbuf */
            rxe = &sw_ring[rx_id];
    
            /* 收包位置,假设超过环状数组则回滚 */
            rx_id++;
            if (rx_id == rxq->nb_rx_desc)
                rx_id = 0;
    
            /* mbuf加载cache下次循环使用 */
            /* Prefetch next mbuf while processing current one. */
            rte_em_prefetch(sw_ring[rx_id].mbuf);
    
            /* 取下一个descriptor,以及mbuf指针下次循环使用 */
            /* 一个cache line是4个descriptor大小(64字节) */
            /*
             * When next RX descriptor is on a cache-line boundary,
             * prefetch the next 4 RX descriptors and the next 8 pointers
             * to mbufs.
             */
            if ((rx_id & 0x3) == 0) {
                rte_em_prefetch(&rx_ring[rx_id]);
                rte_em_prefetch(&sw_ring[rx_id]);
            }
    
            /* Rearm RXD: attach new mbuf and reset status to zero. */
    
            /* 替换sw_ring entry的mbuf指针 */
            rxm = rxe->mbuf;
            rxe->mbuf = nmb;
            dma_addr =
                rte_cpu_to_le_64(RTE_MBUF_DATA_DMA_ADDR_DEFAULT(nmb));
            rxdp->buffer_addr = dma_addr;
    
            /* 重置当前descriptor的status */
            rxdp->status = 0;
    
            /*
             * Initialize the returned mbuf.
             * 1) setup generic mbuf fields:
             *    - number of segments,
             *    - next segment,
             *    - packet length,
             *    - RX port identifier.
             * 2) integrate hardware offload data, if any:
             *    - RSS flag & hash,
             *    - IP checksum flag,
             *    - VLAN TCI, if any,
             *    - error flags.
             */
            pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.length) -
                    rxq->crc_len);
            rxm->pkt.data = (char*) rxm->buf_addr + RTE_PKTMBUF_HEADROOM;
            rte_packet_prefetch(rxm->pkt.data);
            rxm->pkt.nb_segs = 1;
            rxm->pkt.next = NULL;
            rxm->pkt.pkt_len = pkt_len;
            rxm->pkt.data_len = pkt_len;
            rxm->pkt.in_port = rxq->port_id;
    
            rxm->ol_flags = rx_desc_status_to_pkt_flags(status);
            rxm->ol_flags = (uint16_t)(rxm->ol_flags |
                    rx_desc_error_to_pkt_flags(rxd.errors));
    
            /* Only valid if PKT_RX_VLAN_PKT set in pkt_flags */
            rxm->pkt.vlan_macip.f.vlan_tci = rte_le_to_cpu_16(rxd.special);
    
            /* 把收到的mbuf返回给用户 */
            /*
             * Store the mbuf address into the next entry of the array
             * of returned packets.
             */
            rx_pkts[nb_rx++] = rxm;
        }
    
        /* 收包位置更新 */
        rxq->rx_tail = rx_id;
    
        /* 更新被上层软件使用的descriptor个数 */
        /*
         * If the number of free RX descriptors is greater than the RX free
         * threshold of the queue, advance the Receive Descriptor Tail (RDT)
         * register.
         * Update the RDT with the value of the last processed RX descriptor
         * minus 1, to guarantee that the RDT register is never equal to the
         * RDH register, which creates a "full" ring situtation from the
         * hardware point of view...
         */
        nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
        if (nb_hold > rxq->rx_free_thresh) {
            PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
                "nb_hold=%u nb_rx=%u
    ",
                (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
                (unsigned) rx_id, (unsigned) nb_hold,
                (unsigned) nb_rx);
            rx_id = (uint16_t) ((rx_id == 0) ?

    (rxq->nb_rx_desc - 1) : (rx_id - 1)); E1000_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id); nb_hold = 0; } rxq->nb_rx_hold = nb_hold; return (nb_rx); }



    发包函数

    static inline uint16_t
    rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
             struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
    {
        struct rte_eth_dev *dev;
    
        dev = &rte_eth_devices[port_id];
        return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
    }


    调用的PMD的发包函数


    uint16_t
    eth_em_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
            uint16_t nb_pkts)
    {
        struct em_tx_queue *txq;
        struct em_tx_entry *sw_ring;
        struct em_tx_entry *txe, *txn;
        volatile struct e1000_data_desc *txr;
        volatile struct e1000_data_desc *txd;
        struct rte_mbuf     *tx_pkt;
        struct rte_mbuf     *m_seg;
        uint64_t buf_dma_addr;
        uint32_t popts_spec;
        uint32_t cmd_type_len;
        uint16_t slen;
        uint16_t ol_flags;
        uint16_t tx_id;
        uint16_t tx_last;
        uint16_t nb_tx;
        uint16_t nb_used;
        uint16_t tx_ol_req;
        uint32_t ctx;
        uint32_t new_ctx;
        union rte_vlan_macip hdrlen;
    
        txq = tx_queue;
        sw_ring = txq->sw_ring;
        txr     = txq->tx_ring;
        /* 发包位置 */
        tx_id   = txq->tx_tail;
        /* 先把旧的已发送的mbuf回收,然后把新的要发送的mbuf写入 */
        txe = &sw_ring[tx_id];
    
        /* 可用tx descriptor太少的话做cleanup */
        /* Determine if the descriptor ring needs to be cleaned. */
        if ((txq->nb_tx_desc - txq->nb_tx_free) > txq->tx_free_thresh) {
            em_xmit_cleanup(txq);
        }
    
        /* nb_pkts为一共要发送的报文个数(32) */
        /* TX loop */
        for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
            new_ctx = 0;
    
            /* 要发送的mbuf指针 */
            tx_pkt = *tx_pkts++;
    
            /* 加载L1,L2 cache,用于释放mbuf */
            RTE_MBUF_PREFETCH_TO_FREE(txe->mbuf);
    
            /*
             * Determine how many (if any) context descriptors
             * are needed for offload functionality.
             */
            ol_flags = tx_pkt->ol_flags;
    
            /* If hardware offload required */
            tx_ol_req = (uint16_t)(ol_flags & (PKT_TX_IP_CKSUM |
                                PKT_TX_L4_MASK));
            if (tx_ol_req) {
                hdrlen = tx_pkt->pkt.vlan_macip;
                /* 检查是否须要新的context descriptor */
                /* If new context to be built or reuse the exist ctx. */
                ctx = what_ctx_update(txq, tx_ol_req, hdrlen);
    
                /* Only allocate context descriptor if required*/
                new_ctx = (ctx == EM_CTX_NUM);
            }
    
            /* 须要的descriptor个数为报文的segment数+是否须要context descriptor */
            /*
             * Keep track of how many descriptors are used this loop
             * This will always be the number of segments + the number of
             * Context descriptors required to transmit the packet
             */
            nb_used = (uint16_t)(tx_pkt->pkt.nb_segs + new_ctx);
    
            /* 结束位置, 从tx_id处用起,因此-1 */
            /* 
             * The number of descriptors that must be allocated for a
             * packet is the number of segments of that packet, plus 1
             * Context Descriptor for the hardware offload, if any.
             * Determine the last TX descriptor to allocate in the TX ring
             * for the packet, starting from the current position (tx_id)
             * in the ring.
             */
            tx_last = (uint16_t) (tx_id + nb_used - 1);
    
            /* 回滚 */
            /* Circular ring */
            if (tx_last >= txq->nb_tx_desc)
                tx_last = (uint16_t) (tx_last - txq->nb_tx_desc);
    
            PMD_TX_LOG(DEBUG, "port_id=%u queue_id=%u pktlen=%u"
                " tx_first=%u tx_last=%u
    ",
                (unsigned) txq->port_id,
                (unsigned) txq->queue_id,
                (unsigned) tx_pkt->pkt.pkt_len,
                (unsigned) tx_id,
                (unsigned) tx_last);
    
            /*
             * Make sure there are enough TX descriptors available to
             * transmit the entire packet.
             * nb_used better be less than or equal to txq->tx_rs_thresh
             */
            while (unlikely (nb_used > txq->nb_tx_free)) {
                PMD_TX_FREE_LOG(DEBUG,
                        "Not enough free TX descriptors "
                        "nb_used=%4u nb_free=%4u "
                        "(port=%d queue=%d)",
                        nb_used, txq->nb_tx_free,
                        txq->port_id, txq->queue_id);
    
                if (em_xmit_cleanup(txq) != 0) {
                    /* Could not clean any descriptors */
                    if (nb_tx == 0)
                        return (0);
                    goto end_of_tx;
                }
            }
    
            /*
             * By now there are enough free TX descriptors to transmit
             * the packet.
             */
    
            /*
             * Set common flags of all TX Data Descriptors.
             *
             * The following bits must be set in all Data Descriptors:
             *    - E1000_TXD_DTYP_DATA
             *    - E1000_TXD_DTYP_DEXT
             *
             * The following bits must be set in the first Data Descriptor
             * and are ignored in the other ones:
             *    - E1000_TXD_POPTS_IXSM
             *    - E1000_TXD_POPTS_TXSM
             *
             * The following bits must be set in the last Data Descriptor
             * and are ignored in the other ones:
             *    - E1000_TXD_CMD_VLE
             *    - E1000_TXD_CMD_IFCS
             *
             * The following bits must only be set in the last Data
             * Descriptor:
             *   - E1000_TXD_CMD_EOP
             *
             * The following bits can be set in any Data Descriptor, but
             * are only set in the last Data Descriptor:
             *   - E1000_TXD_CMD_RS
             */
            cmd_type_len = E1000_TXD_CMD_DEXT | E1000_TXD_DTYP_D |
                E1000_TXD_CMD_IFCS;
            popts_spec = 0;
    
            /* Set VLAN Tag offload fields. */
            if (ol_flags & PKT_TX_VLAN_PKT) {
                cmd_type_len |= E1000_TXD_CMD_VLE;
                popts_spec = tx_pkt->pkt.vlan_macip.f.vlan_tci <<
                    E1000_TXD_VLAN_SHIFT;
            }
    
            if (tx_ol_req) {
                /*
                 * Setup the TX Context Descriptor if required
                 */
                if (new_ctx) {
                    volatile struct e1000_context_desc *ctx_txd;
    
                    /* 假设须要context descriptor, tx_id处存放ctx的tx descriptor */
                    ctx_txd = (volatile struct e1000_context_desc *)
                        &txr[tx_id];
    
                    /* 下一个tx descriptor */
                    txn = &sw_ring[txe->next_id];
                    RTE_MBUF_PREFETCH_TO_FREE(txn->mbuf);
    
                    if (txe->mbuf != NULL) {
                        rte_pktmbuf_free_seg(txe->mbuf);
                        txe->mbuf = NULL;
                    }
    
                    /* 设置ctx值到txq */
                    em_set_xmit_ctx(txq, ctx_txd, tx_ol_req,
                        hdrlen);
    
                    txe->last_id = tx_last;
    
                    /* tx_id,txe 都分别指向下一个 */
                    tx_id = txe->next_id;
                    txe = txn;
                }
    
                /*
                 * Setup the TX Data Descriptor,
                 * This path will go through
                 * whatever new/reuse the context descriptor
                 */
                popts_spec |= tx_desc_cksum_flags_to_upper(ol_flags);
            }
    
            m_seg = tx_pkt;
            do {
                txd = &txr[tx_id];
                txn = &sw_ring[txe->next_id];
    
                /* 已发送的mbuf,回收,实际的pkt addr已经写入tx descriptor了,mbuf已经没用了 */
                if (txe->mbuf != NULL)
                    rte_pktmbuf_free_seg(txe->mbuf);
    
                /* 当前mbuf增加txe */
                txe->mbuf = m_seg;
    
                /*
                 * Set up Transmit Data Descriptor.
                 */
                slen = m_seg->pkt.data_len;
                buf_dma_addr = RTE_MBUF_DATA_DMA_ADDR(m_seg);
    
                txd->buffer_addr = rte_cpu_to_le_64(buf_dma_addr);
                txd->lower.data = rte_cpu_to_le_32(cmd_type_len | slen);
                txd->upper.data = rte_cpu_to_le_32(popts_spec);
    
                txe->last_id = tx_last;
    
                /* tx_id更新 */
                tx_id = txe->next_id;
                txe = txn;
                m_seg = m_seg->pkt.next;
            } while (m_seg != NULL);
    
            /* 驱动相关的flag,vlan ip checksum之类,略过 */
            /*
             * The last packet data descriptor needs End Of Packet (EOP)
             */
            cmd_type_len |= E1000_TXD_CMD_EOP;
            txq->nb_tx_used = (uint16_t)(txq->nb_tx_used + nb_used);
            txq->nb_tx_free = (uint16_t)(txq->nb_tx_free - nb_used);
    
            /* Set RS bit only on threshold packets' last descriptor */
            if (txq->nb_tx_used >= txq->tx_rs_thresh) {
                PMD_TX_FREE_LOG(DEBUG,
                        "Setting RS bit on TXD id="
                        "%4u (port=%d queue=%d)",
                        tx_last, txq->port_id, txq->queue_id);
    
                cmd_type_len |= E1000_TXD_CMD_RS;
    
                /* Update txq RS bit counters */
                txq->nb_tx_used = 0;
            }
            txd->lower.data |= rte_cpu_to_le_32(cmd_type_len);
        }
    end_of_tx:
        rte_wmb();
    
        /* 通知驱动有报文发送 */
        /*
         * Set the Transmit Descriptor Tail (TDT)
         */
        PMD_TX_LOG(DEBUG, "port_id=%u queue_id=%u tx_tail=%u nb_tx=%u",
            (unsigned) txq->port_id, (unsigned) txq->queue_id,
            (unsigned) tx_id, (unsigned) nb_tx);
        E1000_PCI_REG_WRITE(txq->tdt_reg_addr, tx_id);
    
        /* 更新tx队列位置 */
        txq->tx_tail = tx_id;
    
        return (nb_tx);
    }


  • 相关阅读:
    CompletableFuture详解1
    使用CompletionService解决耗时时间过长任务导致的阻塞问题以及解决ExecutorService.shutdownNow()导致正在执行中的任务被打断,但该任务不会被返回
    ScheduledExecutorService和ScheduledThreadPoolExecutor
    future的缺陷和CompletionService对他的优化
    Future方法详解
    ExecutorService的API详解
    scheduler的前奏Timer&Crontab和Quartz的比较
    [Paper Reading]--Exploiting Relevance Feedback in Knowledge Graph
    [Leetcode] DP-- 96. Unique Binary Search Trees
    [Leetcode] DP-- 474. Ones and Zeroes
  • 原文地址:https://www.cnblogs.com/jhcelue/p/6941166.html
Copyright © 2011-2022 走看看