zoukankan      html  css  js  c++  java
  • lock free 编程

    转载时请注明出处和作者联系方式
    文章出处:http://www.limodev.cn/blog
    作者联系方式:李先静 <xianjimli at hotmail dot com>

    无锁(lock-free)数据结构

    提到并行计算通常都会想到加锁,事实却并不是如此,大多数并发是不须要加锁的。比方在不同电脑上执行的代码编辑器,两者并发执行不须要加锁。在一台电脑上同一时候执行的媒体播放放器和代码编辑器,两者并发执行不须要加锁(当然系统调用和进程调度是要加锁的)。在同一个进程中执行多个线程,假设各自处理独立的事情也不须要加锁(当然系统调用、进程调度和内存分配是要加锁的)。在以上这些情况里,各个并发实体之间没有共享数据,所以尽管并发执行但不须要加锁。

    多线程并发执行时,尽管有共享数据,假设全部线程仅仅是读取共享数据而不改动它,也是不用加锁的,比方代码段就是共享的“数据”,每一个线程都会读取,可是不用加锁。排除全部这些情况,多线程之间有共享数据,有的线程要改动这些共享数据,有的线程要读取这些共享数据,这才是程序猿须要关注的情况,也是本节我们讨论的范围。

    在并发的环境里,加锁能够保护共享的数据,可是加锁也会存在一些问题:

    o 因为临界区无法并发执行,进入临界区就须要等待,加锁带来效率的减少。

    o 在复杂的情况下,非常easy造成死锁,并发实体之间无止境的互相等待。

    o 在中断/信号处理函数中不能加锁,给并发处理带来困难。

    o 优先级倒置造成实时系统不能正常工作。低级优先进程拿到高优先级进程须要的锁,结果是高/低优先级的进程都无法执行,中等优先级的进程可能在狂跑。

    因为并发与加锁(相互排斥)的矛盾关系,无锁数据结构自然成为程序猿关注的焦点,这也是本节要介绍的:

    o CPU提供的原子操作。

    大约在七八年前,我们用apache的xerces来解析XML文件,奇怪的是多线程反而比单线程慢。他们找了非常久也没有找出原因,仅仅是证实使用多进程取代多线程会快一个数量级,在Windows上他们就使用了多进程的方式。后来移植到linux时候,我发现xerces每创建一个结点都会去更新一些全局的统计信息,比方把结点的总数加一,它使用的pthread_mutex实现相互排斥。这就是问题所在:一个XML文档有数以万计的结点,以50个线程并发为例,每一个线程解析一个XML文档,总共要进行上百万次的加锁/解锁,差点儿全部线程都在等待,你说能快得了吗?

    当时我知道Windows下有InterlockedIncrement之类的函数,它们利用CPU一些特殊指令,保证对整数的基本操作是原子的。查找了一些资源发现Linux下也有相似的函数,后来我把全部加锁去掉,换成这些原子操作,速度比多进程执行还快了几倍。以下我们看++和—的原子操作在IA架构上的实现:

    #define ATOMIC_SMP_LOCK "lock ; "
    typedef struct { volatile int counter; } atomic_t;
    
    static __inline__ void atomic_inc(atomic_t *v)
    {
        __asm__ __volatile__(
            ATOMIC_SMP_LOCK "incl %0"
            :"=m" (v->counter)
            :"m" (v->counter));
    }
    
    static __inline__ void atomic_dec(atomic_t *v)
    {
        __asm__ __volatile__(
            ATOMIC_SMP_LOCK "decl %0"
            :"=m" (v->counter)
            :"m" (v->counter));
    }
    

    o 单入单出的循环队列。单入单出的循环队列是一种特殊情况,虽然特殊可是非常有用,重要的是它不须要加锁。这里的单入是指仅仅有一个线程向队列里追加数据(push),单出仅仅是指仅仅有一个线程从队列里取数据(pop),循环队列与普通队列相比,不同之处在于它的最大数据储存量是事先固定好的,不能动态增长。虽然有这些限制它的应用还是相当广泛的。这我们介绍一下它的实现:

    数据下定义例如以下:

    typedef struct _FifoRing
    {
        int r_cursor;
        int w_cursor;
        size_t length;
        void* data[0];
    
    }FifoRing;
    

    r_cursor指向队列头,用于取数据(pop)。w_cursor指向队列尾,用于追加数据(push)。length表示队列的最大数据储存量,data表示存放的数据,[0]在这里表示变长的缓冲区(前面我们已经讲过)。

    创建函数

    FifoRing* fifo_ring_create(size_t length)
    {
        FifoRing* thiz = NULL;
    
        return_val_if_fail(length > 1, NULL);
    
        thiz = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));
    
        if(thiz != NULL)
        {
            thiz->r_cursor = 0;
            thiz->w_cursor = 0;
            thiz->length   = length;
        }
    
        return thiz;
    }
    

    这里我们要求队列的长度大于1而不是大于0,为什么呢?排除长度为1的队列没有什么意义的原因外,更重要的原因是队列头与队列尾重叠 (r_cursor= =w_cursor) 时,究竟表示是满队列还是空队列?这个要搞清楚才行,上次一个同事犯了这个错误,让我们查了非常久。这里我们觉得队列头与队列尾重叠时表示队列为空,这与队列初始状态一致,后面在写的时候始终保留一个空位,避免队列头与队列尾重叠,这样能够消除歧义了。

    追加数据(push)

    Ret fifo_ring_push(FifoRing* thiz, void* data)
    {
        int w_cursor = 0;
        Ret ret = RET_FAIL;
        return_val_if_fail(thiz != NULL, RET_FAIL);
    
        w_cursor = (thiz->w_cursor + 1) % thiz->length;
    
        if(w_cursor != thiz->r_cursor)
        {
            thiz->data[thiz->w_cursor] = data;
            thiz->w_cursor = w_cursor;
    
            ret = RET_OK;
        }
    
        return ret;
    }
    

    队列头和队列尾之间另一个以上的空位时就追加数据,否则返回失败。

    取数据(pop)

    Ret fifo_ring_pop(FifoRing* thiz, void** data)
    {
        Ret ret = RET_FAIL;
        return_val_if_fail(thiz != NULL && data != NULL, RET_FAIL);
    
        if(thiz->r_cursor != thiz->w_cursor)
        {
            *data = thiz->data[thiz->r_cursor];
            thiz->r_cursor = (thiz->r_cursor + 1)%thiz->length;
    
            ret = RET_OK;
        }
    
        return ret;
    }
    

    队列头和队列尾不重叠表示队列不为空,取数据并移动队列头。

    o 单写多读的无锁数据结构。单写表示仅仅有一个线程去改动共享数据结构,多读表示有多个线程去读取共享数据结构。前面介绍的读写锁能够有效的解决问题,但更高效的办法是使用无锁数据结构。思路例如以下:

    就像为了避免显示闪烁而使用的双缓冲一样,我们使用两份数据结构,一份数据结构用于读取,全部线程都能够在不加锁的情况下读取这个数据结构。另外一份数据结构用于改动,因为仅仅有一个线程会改动它,所以也不用加锁。

    在改动之后,我们再交换读/写的两个函数结构,把另外一份也改动过来,这样两个数据结构就一致了。在交换时要保证没有线程在读取,所以我们还须要一个读线程的引用计数。如今我们看看怎么把前面写的双向链表改为单写多读的无锁数据结构。

    为了保证交换是原子的,我们须要一个新的原子操作CAS(compare and swap)。

    #define CAS(_a, _o, _n)                                    /
    ({ __typeof__(_o) __o = _o;                                /
       __asm__ __volatile__(                                   /
           "lock cmpxchg %3,%1"                                /
           : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) /
           :  "0" (__o), "r" (_n) );                           /
       __o;                                                    /
    })
    

    数据结构

    typedef struct _SwmrDList
    {
        atomic_t rd_index_and_ref;
        DList* dlists[2];
    }SwmrDList;
    

    两个链表,一个用于读一个用于写。rd_index_and_ref的最高字节记录用于读取的双向链表的索引,低24位用于记录读取线程的引用记数,最大支持16777216个线程同一时候读取,应该是足够了,所以后面不考虑它的溢出。

    读取操作

    int      swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
    {
        int ret = 0;
        return_val_if_fail(thiz != NULL && thiz->dlists != NULL, -1);
    
        atomic_inc(&(thiz->rd_index_and_ref));
        size_t rd_index = (thiz->rd_index_and_ref.counter>>24) & 0x1;
        ret = dlist_find(thiz->dlists[rd_index], cmp, ctx);
        atomic_dec(&(thiz->rd_index_and_ref));
    
        return ret;
    }
    

    改动操作

    Ret swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
    {
        Ret ret = RET_FAIL;
        DList* wr_dlist = NULL;
        return_val_if_fail(thiz != NULL && thiz->dlists != NULL, ret);
    
        size_t wr_index = !((thiz->rd_index_and_ref.counter>>24) & 0x1);
        if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
        {
            int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
            int rd_index_new = wr_index << 24;
    
            do
            {
                usleep(100);
            }while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));
    
            wr_index = rd_index_old>>24;
            ret = dlist_insert(thiz->dlists[wr_index], index, data);
        }
    
        return ret;
    }
    

    先改动用于改动的双向链表,改动完毕之后等到没有线程读取时,交换读/写两个链表,再改动还有一个链表,此时两个链表状态保持一致。

    稍做改进,对改动的操作进行加锁,就能够支持多读多写的数据结构,读是无锁的,写是加锁的。

    o 真正的无锁数据结构。Andrei Alexandrescu的《Lock-FreeDataStructures》预计是这方面最经典的论文了,对他的方法我開始感到惊奇后来感到失望,惊奇的是算法的巧妙,失望的是无锁的限制和代价。作者最后说这样的数据结构仅仅适用于WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情况。并且每次改动都要拷贝整个数据结构(甚至多次),所以不要指望这样的方法能带来多少性能上的提高,唯一的优点是能避免加锁带来的部分副作用。有兴趣的朋友能够看下这篇论文,这里我就不反复了。

  • 相关阅读:
    1571:【例 3】凸多边形的划分
    1570:【例 2】能量项链
    2.25
    2.24 T2 牧场 by greens 1s 128M (pasture.cpp)
    2.24 T1 P3515 [POI2011]Lightning Conductor
    白嫖视频的方法
    2.24 T3 P1912 [NOI2009] 诗人小G
    2.24
    斜率优化
    windy数的补充——数位dp中如何求[a,b]区间内的方案数
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/4009279.html
Copyright © 2011-2022 走看看