zoukankan      html  css  js  c++  java
  • 聊一聊无锁队列rte_ring 转载

    之前用基于dpdk 实现小包快速转发的时候有用到无锁队列!今天就来看看吧!(后续完成了去dpdk化,直接在内核完成快速转发功能)

     dpdk的无锁队列ring是借鉴了linux内核kfifo无锁队列。ring的实质是FIFO的环形队列。

    • 先进先出(FIFO)
    • 最大大小固定,指针存储在表中
    • 无锁实现
    • 多消费者或单消费者出队操作
    • 多生产者或单生产者入队操作
    • 批量出队 - 如果成功,将指定数量的元素出队,否则什么也不做
    • 批量入队 - 如果成功,将指定数量的元素入队,否则什么也不做
    • 突发出队 - 如果指定的数目出队失败,则将最大可用数目对象出队
    • 突发入队 - 如果指定的数目入队失败,则将最大可入队数目对象入队

    相比于链表,这个数据结构的优点如下:

    • 更快;只需要一个sizeof(void *)的Compare-And-Swap指令,而不是多个双重比较和交换指令
    • 与完全无锁队列像是
    • 适应批量入队/出队操作。 因为指针是存储在表中的,应i多个对象的出队将不会产生于链表队列中一样多的cache miss。 此外,批量出队成本并不比单个对象出队高。

    缺点:

    • 大小固定
    • 大量ring相比于链表,消耗更多的内存,空ring至少包含n个指针。
    /* structure to hold a pair of head/tail values and other metadata */
    struct rte_ring_headtail {
        // 生产者头尾指针,生产完成后都指向队尾
         // 消费者头尾指针,生产完成后都指向队头
        volatile uint32_t head;  /**< Prod/consumer head.预生产到地方/预出队的地方 */
        volatile uint32_t tail;  /**< Prod/consumer tail. 实际生产了的数量 /实际出队的地方 */
        uint32_t single;         /**< True if single prod/cons */
    };
    struct rte_ring {
        /*
         * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
         * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
         * next time the ABI changes
         */
        char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
        int flags;               /**< Flags supplied at creation. */
        const struct rte_memzone *memzone;
                /**< Memzone, if any, containing the rte_ring */
        uint32_t size;           /**< Size of ring. */
        uint32_t mask;           /**< Mask (size-1) of ring. */
        uint32_t capacity;       /**< Usable size of ring */
    
        char pad0 __rte_cache_aligned; /**< empty cache line */
    
        /** Ring producer status. */
        struct rte_ring_headtail prod __rte_cache_aligned;
        char pad1 __rte_cache_aligned; /**< empty cache line */
    
        /** Ring consumer status. */
        struct rte_ring_headtail cons __rte_cache_aligned;
        char pad2 __rte_cache_aligned; /**< empty cache line */
    };

    入队列:

     

     

     http://reader.epubee.com/books/mobile/54/54aa973816d258a932e39464018932ee/text00032.html  以上来自~~~~~~~~~~~~~~

    static __rte_always_inline unsigned int
    __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
             unsigned int n, enum rte_ring_queue_behavior behavior,
             unsigned int is_sp, unsigned int *free_space)
    {
        uint32_t prod_head, prod_next;
        uint32_t free_entries;
    
        n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
                &prod_head, &prod_next, &free_entries);
        if (n == 0)
            goto end;
    //prod_head是旧的r->prod.head
    //r经过__rte_ring_move_prod_head处理后,r->prod.head已经移动到想要的位置&r[1]是数据的位置
        ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
    
        update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
    end:
        if (free_space != NULL)
            *free_space = free_entries - n;
        return n;
    }
    
    
    
    static __rte_always_inline unsigned int
    __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
            unsigned int n, enum rte_ring_queue_behavior behavior,
            uint32_t *old_head, uint32_t *new_head,
            uint32_t *free_entries)
    {
        const uint32_t capacity = r->capacity;
        unsigned int max = n;
        int success;
    
        do {
            /* Reset n to the initial burst count */
            n = max;
    
            *old_head = r->prod.head;
    
            /* add rmb barrier to avoid load/load reorder in weak
             * memory model. It is noop on x86
             */
            rte_smp_rmb();
    
            /*
             *  The subtraction is done between two unsigned 32bits value
             * (the result is always modulo 32 bits even if we have
             * *old_head > cons_tail). So 'free_entries' is always between 0
             * and capacity (which is < size).
              计算当前可用容量,
               cons.tail是小于等于prod.head, 所以r->cons.tail - *old_head得到一个
               负数,capacity减这个差值就得到剩余的容量 
             */
            *free_entries = (capacity + r->cons.tail - *old_head);
    
            /* check that we have enough room in ring */
            if (unlikely(n > *free_entries))
                n = (behavior == RTE_RING_QUEUE_FIXED) ?
                        0 : *free_entries;
    
            if (n == 0)
                return 0;
    
            *new_head = *old_head + n; /* 新头的位置 */
            if (is_sp)  {/* 如果是单生产者,直接更新r->prod.head即可,不需要加锁 */
                r->prod.head = *new_head, success = 1;
            }else{
                /* 如果是多生产者,需要使用cmpset比较,如果&r->prod.head == *old_head
               则&r->prod.head = *new_head
               否则重新循环,获取新的*old_head = r->prod.head,知道成功位置*/
                success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head);
            }
        } while (unlikely(success == 0));
        return n;
    }

    出队:

    原理逻辑和入队一样 代码也比较相似,不具体分析

    static __rte_always_inline unsigned int
    __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
             unsigned int n, enum rte_ring_queue_behavior behavior,
             unsigned int is_sc, unsigned int *available)
    {
        uint32_t cons_head, cons_next;
        uint32_t entries;
    
        n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
                &cons_head, &cons_next, &entries);
        if (n == 0)
            goto end;
    
        DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
    
        update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
    
    end:
        if (available != NULL)
            *available = entries - n;
        return n;
    }
    
    static __rte_always_inline unsigned int
    __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
            unsigned int n, enum rte_ring_queue_behavior behavior,
            uint32_t *old_head, uint32_t *new_head,
            uint32_t *entries)
    {
        unsigned int max = n;
        int success;
    
        /* move cons.head atomically */
        do {
            /* Restore n as it may change every loop */
            n = max;
    
            *old_head = r->cons.head;
    
            /* add rmb barrier to avoid load/load reorder in weak
             * memory model. It is noop on x86
             */
            rte_smp_rmb();
    
            /* The subtraction is done between two unsigned 32bits value
             * (the result is always modulo 32 bits even if we have
             * cons_head > prod_tail). So 'entries' is always between 0
             * and size(ring)-1.
             */
            *entries = (r->prod.tail - *old_head);
    
            /* Set the actual entries for dequeue */
            if (n > *entries)
                n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
    
            if (unlikely(n == 0))
                return 0;
    
            *new_head = *old_head + n;
            if (is_sc)
                r->cons.head = *new_head, success = 1;
            else
                success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                        *new_head);
        } while (unlikely(success == 0));
        return n;
    }

     

  • 相关阅读:
    Day 39 管道 、数据共享与地址池
    Day 38 Semaphore ,Event ,队列
    Day37 多进程
    Day 36 网络编程-计算机的发展
    Day 35 验证客户端的合法性+socketserver
    Day 34 黏包
    Day 33 Socket编程.
    Day 32 网络编程
    Day 31 面向对象考试题 第四次考试.
    Day 30 面向对象的考试题
  • 原文地址:https://www.cnblogs.com/codestack/p/13069310.html
Copyright © 2011-2022 走看看