zoukankan      html  css  js  c++  java
  • rte_atomic32_cmpset

    CAS
    学习无锁队列前先看一个基本概念,CAS原子指令操作。

    CAS(Compare and Swap,比较并替换)原子指令,用来保障数据的一致性。

    指令有三个参数,当前内存值V、旧的预期值A、更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

    在DPDK中封装后的函数如下:

    rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head)
    &r->prod.head指向当前内存值,*old_head为执行该操作前将r->prod.head存储到临时变量的值,*new_head为即将更新的值。

    只有r->prod.head == *old_head才会将r->prod.head更新为*new_head

    rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
    {
            unsigned int ret = 0;
    
            asm volatile(
                            "	lwsync
    "
                            "1:	lwarx %[ret], 0, %[dst]
    "
                            "cmplw %[exp], %[ret]
    "
                            "bne 2f
    "
                            "stwcx. %[src], 0, %[dst]
    "
                            "bne- 1b
    "
                            "li %[ret], 1
    "
                            "b 3f
    "
                            "2:
    "
                            "stwcx. %[ret], 0, %[dst]
    "
                            "li %[ret], 0
    "
                            "3:
    "
                            "isync
    "
                            : [ret] "=&r" (ret), "=m" (*dst)
                            : [dst] "r" (dst),
                              [exp] "r" (exp),
                              [src] "r" (src),
                              "m" (*dst)
                            : "cc", "memory");
    
            return ret;
    }
    static __rte_always_inline void
    update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
            uint32_t single, uint32_t enqueue)
    {
        //1.内存屏障
        if (enqueue)
            rte_smp_wmb();
        else
            rte_smp_rmb();
        //2.如果有其他生产者生产数据,那么需要等待其将数据生产完更新tail指针后,本生产者才能更新tail指针
        if (!single)
            while (unlikely(ht->tail != old_val))
                rte_pause();
        //3.更新tail指针,更新的位置为最新的生产位置,意味着刚刚生产的数据已经全部可以被消费者消费
        ht->tail = new_val;
    }
    static __rte_always_inline int
    rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
    {
            return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
    }
     *
     * @param r
     *   A pointer to the ring structure.
     * @param obj_table
     *   A pointer to a table of void * pointers (objects).
     * @param n
     *   The number of objects to add in the ring from the obj_table.
     * @param free_space
     *   if non-NULL, returns the amount of space in the ring after the
     *   enqueue operation has finished.
     * @return
     *   The number of objects enqueued, either 0 or n
     */
    static __rte_always_inline unsigned int
    rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                             unsigned int n, unsigned int *free_space)
    {
            return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
                            __IS_MP, free_space);
    }

      static __rte_always_inline unsigned int
        __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
                unsigned int n, enum rte_ring_queue_behavior behavior,
                uint32_t *old_head, uint32_t *new_head,
                uint32_t *free_entries)
        {
            const uint32_t capacity = r->capacity;
            uint32_t cons_tail;
            unsigned int max = n;
            int success;
        
            *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
            do {
                /* Reset n to the initial burst count */
                n = max;
        
                /* Ensure the head is read before tail */
                __atomic_thread_fence(__ATOMIC_ACQUIRE);
        
                /* load-acquire synchronize with store-release of ht->tail
                 * in update_tail.
                 */
                cons_tail = __atomic_load_n(&r->cons.tail,
                            __ATOMIC_ACQUIRE);
        
                /* The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
                 * *old_head > cons_tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
                *free_entries = (capacity + cons_tail - *old_head);
        
                /* check that we have enough room in ring */
                if (unlikely(n > *free_entries))
                    n = (behavior == RTE_RING_QUEUE_FIXED) ?
                            0 : *free_entries;
        
                if (n == 0)
                    return 0;
        
                *new_head = *old_head + n;
                if (is_sp)
                    r->prod.head = *new_head, success = 1;
                else
                    /* on failure, *old_head is updated */
                    success = __atomic_compare_exchange_n(&r->prod.head,
                            old_head, *new_head,
                            0, __ATOMIC_RELAXED,
                            __ATOMIC_RELAXED);
            } while (unlikely(success == 0));
            return n;
        }

    参考;

    https://www.sunxidong.com/576.html

  • 相关阅读:
    shell 命令参数
    Windows系统配置Python环境,python2和python3共存
    jmeter面试题及答案
    接口测试
    python语法基础
    pycharm环境安装及注册
    Win10下python 2.7与python 3.6双环境安装图文教程
    eclipse中导入maven项目时pom文件报错
    ssm-crud项目--总结
    ssm-crud项目——分页查询
  • 原文地址:https://www.cnblogs.com/dream397/p/13646492.html
Copyright © 2011-2022 走看看