https://coolshell.cn/articles/8239.html
主要讲的是《Implementing Lock-Free Queues》的论点,具体直接看论文最好。这里总结些要点。
CAS就是Compare And Swap。gcc可以调用:
1 __sync_bool_compare_and_swap
这段代码讲出无锁的两个关键手段:
1 EnQueue(x) //进队列 2 { 3 //准备新加入的结点数据 4 q = new record(); 5 q->value = x; 6 q->next = NULL; 7 8 do { 9 p = tail; //取链表尾指针的快照 10 } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试 11 12 CAS(tail, p, q); //置尾结点 13 }
一个就是CAS,一个就是Retry-Loop。
然后,还有一个ABA问题,就是在被抢占之前和之后数据地址没变,但是数据内容可能变了。解决思路就是加一个额外的状态来记录(也可以用记数)。
一个无锁队列的实现:
1 #include <sys/mman.h> 2 #include <string.h> 3 #include <iostream> 4 5 class CircleQueue { 6 public: 7 CircleQueue(int queue_count) { 8 this->queue_size_ = queue_count * sizeof(void*) + sizeof(unsigned long*) * 2; 9 this->queue_count_ = queue_count; 10 shm_ = mmap(NULL, queue_size_, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS , -1, 0); 11 memset(shm_, 0, queue_size_); 12 head_ = (unsigned long*)shm_; 13 tail_ = (unsigned long*)((char*)shm_ + sizeof(unsigned long)); 14 queue_ = (void**)((char*)shm_ + sizeof(unsigned long) * 2); 15 } 16 17 ~CircleQueue() { 18 munmap(shm_, queue_size_); 19 } 20 21 int PushOneThread(void* ap) { 22 if((int)(*head_ - *tail_) >= queue_count_) { 23 return 1;//full 24 } 25 queue_[ *head_ % queue_count_ ] = ap; 26 *head_ = *head_ + 1; 27 return 0; 28 } 29 30 bool IsEmpty() { 31 if(*head_ == *tail_) { 32 return true; 33 } 34 return false; 35 } 36 37 int PopOneThread(void** ap) { 38 if(*head_ == *tail_) { 39 return 1; 40 } 41 42 if (ap == NULL) { 43 return -1; 44 } 45 *ap = *(queue_ + *tail_ % queue_count_); 46 *tail_ = *tail_ + 1; 47 return 0; 48 } 49 50 int PopMultiThread(void** ap, int retry) { 51 *ap = NULL; 52 for(int i = 0; i < retry; ++i) { 53 volatile unsigned long tail = *tail_; 54 if(*head_ == tail) { 55 return 1; 56 } 57 58 int ret = __sync_bool_compare_and_swap(tail_, tail, tail + 1); 59 if(ret) { 60 *ap = *( queue_ + (tail % queue_count_)); 61 return 0; 62 } 63 } 64 return -1; 65 } 66 private: 67 unsigned long* head_; 68 unsigned long* tail_; 69 void** queue_; 70 void* shm_; 71 int queue_size_; 72 int queue_count_; 73 };
这里的场景主要是一写多读。写的时候操作的是head,不需要加锁。读的时候,tail在改变的时候用上了CAS和retry-loop。 注意这里tail_只有在写的时候改变,而且只会往上加,所以不存在ABA问题。
p.s. 多进程通信的一个关键点,就是在于共享内存的时候,要确保所有的地址都是多进程共享的。所以这里存的指针,如果是多进程环境下,也应该是从共享内存中分配的。