单读者单写者很容易进行冲突避免,毕竟他们操作的是不同的指针(head和rear),而多读者则需要共同操作head,多写者共同操作rear,似乎必须要加锁,锁机制是内核提供的一种对象,比较影响效率。
可以借鉴spinlock的实现机制,利用CPU提供的原子指令(x86上有cmpxchg,即比较交换,当某个内存地址里的值等于某个值时,则将该地址里的内容与某个寄存器交换,这一系列操作是原子的,由CPU保证),实现CAS原子操作,意为Compare & Set(Swap?)。
一种典型的CAS描述如下:
bool compare_and_set(int *reg, int oldval, int newval){ if(*reg==oldval){ *reg=newval; return true; } return false; }
记住这个CAS操作是原子的,即不可能多个线程通过CAS操作相同的对象。
于是,对于入队列可以实现为
void Enqueue(x){ q=new node(); q->value=x; q->next=NULL; do{ p=tail; }while(!CAS(p->next,NULL,q)); CAS(tail,p,q); }
这种实现有个问题,即只有第一个线程执行了CAS(tail,p,q),其他的线程才能结束循环,继续下一步,而如果第一个线程异常终止或暂停(而没有成功执行CAS操作),则其他所有线程都要死循环。
解决办法是,在do循环中,让线程自己遍历到真正的尾指针,而不是等待tail被更新。
void Enqueue(x){ q=new node(); q->value=x; q->next=NULL; p=tail; do{ while(p->next!=NULL) p=p->next; }while(!CAS(p->next,NULL,q)); CAS(tail,p,q); }
如此,即使第一个线程没有执行CAS(tail,p,q),其他线程也可以正常退出循环。
然而,问题又来了,如果多个线程同时到达CAS(tail,p,q),那么只有第一个线程才能正确执行,因为其他的线程此时不满足tail==p,只有等到第一个线程正确执行CAS(tail,p,q)后,才能满足tail==p。遗憾的是,多个线程执行CAS(tail,p,q)的顺序是随机的,如果它们正好按照1,2,3,...的顺序执行,则最终tail可以指向正确的尾指针,如果是...,3,2,1,则前面所有线程的CAS(tail,p,q)都会失败,最后tail指向错误的结尾。
为了在这个地方确定一个顺序,可以给每个线程的q一个编号,该编号即为该节点在链表里的序号,最后CAS(tail,p,q)前,判断当前q的序号是否大于tail的,如果大于,则执行,否则不执行。
void Enqueue(x){ q=new node(); q->value=x; q->next=NULL; p=tail; q->id=p->id+1; do{ while(p->next!=NULL) {p=p->next;q->id++;} }while(!CAS(p->next,NULL,q)); do{ p=tail; }while(q->id>p->id && CAS(tail,p,q)); }