zoukankan      html  css  js  c++  java
  • 基于循环数组的无锁队列

    在之前的两篇博客(线程安全的无锁RingBuffer的实现多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程、一个写线程的情况下,以及只有一个写线程、两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现。但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现。本文实现了一种基于循环数组和原子运算的无锁队列。采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率。之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可以达到较好的效率。本文的实现参考了论文Implementing Lock-Free Queues所提到的方法。但是,在这篇论文中,作者并没有给出具体的实现。本文的实现也和论文中的方法有所不同。本文的实现基于Windows操作系统,代码在Visual Studio 2010下测试通过。

    本文的实现基于compare and swap这个原子操作,简写为CAS。其原型为

    long CAS(long* ptr, long old_value, long new_value).

    其操作为,如果ptr所指向的变量和old_value相等,则将其置为new_value. 否则什么也不做。返回值为ptr所指向的变量。

    本文提出的方法的基本原理如下。代码会在其后附上。

    首先,我们约定,有一个特殊的数值,叫做EMPTY,存入数组的所有数都不能是这个数值。在一开始,将整个buffer的值都初始化为EMPTY.

    第二,以两个变量,readCnt和writeCnt,来记录读和写的次数。这两个数模数组长度,就可以得到下一次读和写的位置。如果readCnt和writeCnt模buffer大小相等,则说明当前队列为控;而如果在写的位置不是EMPTY,则说明buffer已满。

    第三,也就是最重要的,线程同步的原理。在进行写buffer操作时,首先通过CAS操作将buffer对应位置置为指定值,而writeCnt不变,因此其他线程无法在同样的位置进行写操作,这样防止了其他写线程的覆盖。而由于此时writeCnt还没有变,读线程此时也无法读该位置的数据,这样防止了其他读线程的冲突。接下来,第二步,则是通过CAS操作,将writeCnt加一。可以看到,代码中在第一个CAS操作失败的情况下,会执行一个增大writeCnt的原子操作。这样做的目的是避免某个线程停掉导致其他所有线程都停掉。

    而在进行写操作时,首先通过判断readCnt和writeCnt是否相等来判断当前队列是否已满。这样做的原因如前所述,是为了避免和写线程之间的冲突。接下来,如果当前位置可以读,则通过一个CAS操作将readCnt加一。这样,其他读线程就无法再读这个位置。而由于这个位置值还不是EMPTY,其他写线程也无法写这个位置。接下来,在读走数值之后,再通过CAS操作将buffer中的这个位置置为EMPTY. 此时,写线程可以在这个位置写入数据。

    但是,这个实现其实还有点问题。如果读线程比较多,例如,读线程个数和数组长度一样,就可能两个读线程同时读同一个位置。这样的读冲突这个程序是无法避免的。另外,如果读数据过程中有一个读线程停掉了,那么其他线程在读或者写到这个位置时,也会被阻塞。

    代码如下。

     1 #include <windows.h>
     2 #include <stdlib.h>
     3 
     4 #define CAS(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval))
     5 
     6 static const long EMPTY=-1;
     7 
     8 class NBQueue {
     9 public:
    10   NBQueue()
    11     : queueSize(0)
    12     , buffer(NULL)
    13     , readCnt(0)
    14     , writeCnt(0){}
    15 
    16   ~NBQueue() {
    17     uninit();
    18   }
    19 
    20   bool init(int size) {
    21     queueSize = size;
    22     try {
    23       buffer = new long[queueSize];
    24     }
    25     catch (...) {
    26       queueSize = 0;
    27       buffer = NULL;
    28       return false;
    29     }
    30     for (int i = 0; i < queueSize; i++) {
    31       buffer[i] = EMPTY;
    32     }
    33     return true;
    34   }
    35   void uninit() {
    36     if (buffer != NULL) {
    37       delete[] buffer;
    38       buffer = NULL;
    39     }
    40     queueSize = 0;
    41   }
    42 
    43   bool put(long v) {
    44     bool succ;
    45     long tail;
    46     long index;
    47     do {
    48       tail = writeCnt;
    49       index = tail % queueSize;
    50       if (buffer[index] != EMPTY) {
    51         return false;
    52       }
    53       long oldval = CAS(&buffer[index], EMPTY, v);
    54       succ = oldval == EMPTY;
    55       if (!succ) {
    56         CAS(&writeCnt, tail, tail + 1);
    57       }
    58     } while (!succ);
    59 
    60     CAS(&writeCnt, tail, tail + 1);
    61 
    62     return true;
    63   }
    64 
    65   bool get(long* v) {
    66     long head;
    67     bool succ;
    68     do {
    69       head = readCnt;
    70       if (head == writeCnt) {
    71         return false;
    72       }
    73       long oldval = CAS(&readCnt, head, head + 1);
    74       succ = oldval == head;
    75     } while (!succ);
    76 
    77     long index = head % queueSize;
    78     *v = buffer[index];
    79     CAS(&buffer[index], *v, EMPTY);
    80 
    81     return true;
    82   }
    83 
    84 private:
    85   long* buffer;
    86   int queueSize;
    87   long readCnt;
    88   long writeCnt;
    89 };
  • 相关阅读:
    SQL键值约束、索引使用
    C#字符串的四舍五入
    VB中字符串操作函数
    C#文本选中及ContextMenuStrip菜单使用
    C#关于new的用法
    C#有关日期的使用方法
    break,continue的区别
    在Lua中使用数字的时候有个坑
    关于自动寻径和图、邻接表的学习和启发
    关于在Cocos2dx引擎中手动绑定C++到Lua
  • 原文地址:https://www.cnblogs.com/l00l/p/4217165.html
Copyright © 2011-2022 走看看