zoukankan      html  css  js  c++  java
  • dpdk ring 多/单生产者、多/单消费者

    #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
    #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */

    #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { 
            unsigned int i; 
            uint32_t idx = cons_head & (r)->mask; 
            const uint32_t size = (r)->size; 
            obj_type *ring = (obj_type *)ring_start; 
            if (likely(idx + n < size)) { 
                    for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {
                            obj_table[i] = ring[idx]; 
                            obj_table[i+1] = ring[idx+1]; 
                            obj_table[i+2] = ring[idx+2]; 
                            obj_table[i+3] = ring[idx+3]; 
                    } 
                    switch (n & 0x3) { 
                    case 3: 
                            obj_table[i++] = ring[idx++]; /* fallthrough */ 
                    case 2: 
                            obj_table[i++] = ring[idx++]; /* fallthrough */ 
                    case 1: 
                            obj_table[i++] = ring[idx++]; 
                    } 
            } else { 
                    for (i = 0; idx < size; i++, idx++) 
                            obj_table[i] = ring[idx]; 
                    for (idx = 0; i < n; i++, idx++) 
                            obj_table[i] = ring[idx]; 
            } 
    } while (0)

      rte_ring_create   

          rte_ring_init(r, name, requested_count, flags);

                    

                r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;  //默认是mp
                r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;//默认是mc

    static __rte_always_inline unsigned
    rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
                             unsigned int n, unsigned int *free_space)
    {
            return __rte_ring_do_enqueue(r, obj_table, n,
                            RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
    }
    static __rte_always_inline unsigned
    rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
                             unsigned int n, unsigned int *free_space)
    {
            return __rte_ring_do_enqueue(r, obj_table, n,
                            RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
    }
    rte_ring_init
        r->flags = flags;
        r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
        r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
    rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                          unsigned int n, unsigned int *free_space)
    {
            return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
                            r->prod.single, free_space);
    }
    rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
                    unsigned int n, unsigned int *available)
    {
            return __rte_ring_do_dequeue(r, obj_table, n,
                                    RTE_RING_QUEUE_VARIABLE,
                                    r->cons.single, available);
    }

    flags(RING_F_SP_EN )实际上影响的并不是队列本身的性质而是调用队列的函数__rte_ring_do_enqueue参数)

    static __rte_always_inline unsigned int
    __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
                    unsigned int n, enum rte_ring_queue_behavior behavior,
                    uint32_t *old_head, uint32_t *new_head,
                    uint32_t *free_entries)
    {
            const uint32_t capacity = r->capacity;
            unsigned int max = n;
            int success;
    
            do {
                    /* Reset n to the initial burst count */
                    n = max;
    
                    *old_head = r->prod.head;
    
                    /* add rmb barrier to avoid load/load reorder in weak
                     * memory model. It is noop on x86
                     */
                    rte_smp_rmb();
    
                    /*
                     *  The subtraction is done between two unsigned 32bits value
                     * (the result is always modulo 32 bits even if we have
                     * *old_head > cons_tail). So 'free_entries' is always between 0
                     * and capacity (which is < size).
                     */
                    *free_entries = (capacity + r->cons.tail - *old_head);
    
                    /* check that we have enough room in ring */
                     * and capacity (which is < size).
                     */
                    *free_entries = (capacity + r->cons.tail - *old_head);
    
                    /* check that we have enough room in ring */
                    if (unlikely(n > *free_entries))
                            n = (behavior == RTE_RING_QUEUE_FIXED) ?
                                            0 : *free_entries;
    
                    if (n == 0)
                            return 0;
    
                    *new_head = *old_head + n;
                    if (is_sp)
                            r->prod.head = *new_head, success = 1;
                    else
                            success = rte_atomic32_cmpset(&r->prod.head,
                                            *old_head, *new_head);
            } while (unlikely(success == 0));
            return n;
    }
    static __rte_always_inline unsigned int
    __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
                    unsigned int n, enum rte_ring_queue_behavior behavior,
                    uint32_t *old_head, uint32_t *new_head,
                    uint32_t *entries)
    {
            unsigned int max = n;
            int success;
    
            /* move cons.head atomically */
            do {
                    /* Restore n as it may change every loop */
                    n = max;
    
                    *old_head = r->cons.head;
    
                    /* add rmb barrier to avoid load/load reorder in weak
                     * memory model. It is noop on x86
                     */
                    rte_smp_rmb();
    
                    /* The subtraction is done between two unsigned 32bits value
                     * (the result is always modulo 32 bits even if we have
                     * cons_head > prod_tail). So 'entries' is always between 0
                     * and size(ring)-1.
                     */
                    *entries = (r->prod.tail - *old_head);
    
                    /* Set the actual entries for dequeue */
                    if (n > *entries)
                            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
    
                    if (unlikely(n == 0))
                            return 0;
    
                    *new_head = *old_head + n;
                    if (is_sc) {
                            r->cons.head = *new_head;
                            rte_smp_rmb();
                            success = 1;
                    } else {
                            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                                            *new_head);
                    }
            } while (unlikely(success == 0));
            return n;
    }
  • 相关阅读:
    Java类 初始化块
    Java中 equals和==
    Raft算法
    OLTP与OLAP
    数据库事务机制ACID
    乐观锁和悲观锁
    SparkStreaming + Flume
    如何提高深度睡眠质量,科学解决睡眠问题
    centos 7 下安装mysql 远程连接出现10038错误如何解决
    CentOS 7.0 yum安装Apache、PHP和MySQL
  • 原文地址:https://www.cnblogs.com/dream397/p/13651558.html
Copyright © 2011-2022 走看看