实现无锁的栈与队列
书续上回:实现无锁栈与队列(1)
对于下面这个看起来很美好的无锁栈:
1 //无锁的栈。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 bool Push(ELEM val) 10 { 11 int old_top; 12 13 do 14 { 15 old_top = top; 16 if (old_top >= MAX) return false; 17 18 if (cas(&top, old_top, old_top + 1)) 19 break; 20 21 }while(1); 22 23 Stack[old_top] = val; 24 25 return true; 26 } 27 28 29 bool Pop(ELEM& val) 30 { 31 int old_top; 32 do 33 { 34 old_top = top; 35 36 if (old_top == 0) return false; 37 38 val = Stack[old_top - 1]; 39 40 if (cas(&top, old_top, old_top - 1)) 41 break; 42 43 } while(1); 44 45 46 return true; 47 }
我们仔细看一下它的Push操作,cas保证了对top的更新是安全,原子的,但是数据的更新呢?这里把数据的更新放后了一步,似乎也是理所当然的:腾出了空间,再往里面写东西。但是,但是,如果还没有来得及完成往栈里写数据,当前线程就被切换了出去呢?有人可能想,换出去就换出去呗,记得再换回来就行了。理想很丰满,现实却很骨感,再想一下,如果在换回来之前,有线程要从这个栈里pop数据怎么办?栈的特性是后进先出的,top被更新之后,在别的线程看来,就是已经完成了数据的插入,如果这时要进行pop操作,但之前的线程又没有真的往里面写完数据,那显然结果就不是我们所想要的。
严重的错误!
回头看一看,问题的根本所在就是,我们没有保证数据的更新和top的更新是一致的。它们被分开了,在多线程的环境里,两个相邻的操作有可能是会相隔很远的,远到从前一个操作的完成到后一个操作的完成,中间可能经过了沧海桑田,任何东西都可能变了。
问题清楚了,解决的方法看起来无非只有两个:
1)保证top的更新 和 数据的插入是同步的,即更新top与更新数据在同一个原子操作里完成。
2)设置标记,在未完成插入数据之前,不允许pop操作。
第一个方案应该是最好的,但它不好实现,cas的使用是有限制的,它并不能对任意长度的内存进行原子操作,而我们这里的设计,是希望设计一个相对泛型一些stack,它应能适应各种长度的数据类型,显然这个要求太严格,cas基本没法满足。
那么我们看看第二个方案,注意到栈的所有操作都是在栈顶,多线程场合下,对栈的操作如果有竞发,那肯定就是在争栈顶,这个特性看起来很有帮助:我们只要保证,在抢到栈顶,完成对栈顶的修改是在同一个线程里完成,而不会被别的线程干扰,那就成了!
1 //无锁的栈。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 static int flag = 0; 9 10 bool Push(ELEM val) 11 { 12 int old_top; 13 14 do 15 { 16 old_top = top; 17 if (old_top >= MAX) return false; 18 19 if (!cas(flag, 0, 1)) continue; 20 21 if (cas(&top, old_top, old_top + 1)) 22 break; 23 24 }while(1); 25 26 Stack[old_top] = val; 27 28 cas(&flag, 1, 0); 29 30 return true; 31 } 32 33 34 bool Pop(ELEM& val) 35 { 36 int old_top; 37 do 38 { 39 old_top = top; 40 41 if (old_top == 0) return false; 42 43 if (!cas(&flag, 0, 1)) continue; 44 45 val = Stack[old_top - 1]; 46 47 if (cas(&top, old_top, old_top - 1)) 48 break; 49 50 } while(1); 51 52 cas(&flag, 1, 0); 53 54 return true; 55 }
上面的代码显然解决了之前的问题,真让人高兴,但再认真看看,我们忽然发现,我们其实是自己实现了一个互斥锁啊,这并不算高明,更重要的是,它没能符合我们第一篇文章里对无锁栈的要求,首先,它的互斥性很强,只允许一个线程独占操作,虽然没有sleep操作导致线程切换,但是它的性能未必比加锁的高,其二,也是最无法接受的,它不符合我们在前一篇文章里提的第二个要求,它不能避免死锁,设想一下,如果一个线程在设置了flag之后,突然异常退出了,挂了,那后续的任何线程,都无法再操作这个栈了。
这个发现很让人沮丧,它几乎表明我们之前的所有工作前功尽弃了,回过头来看,所有问题的根源就在于我们在实现这个数组为基础的栈时,需要在邻界区内做两步操作,更新栈顶,写数据,而且这两个操作又有要格的顺序要求,这个要求事实上太严格,以致于我现在没法想到一个合适的方法来解决。但退一步来讲,换一个想法,我们能不能干脆就避免这两个操作同时在邻界区进行呢?这是一个让人眼前一亮的思路,用链表不就行了吗?用链表来实现无锁的队列、栈,网上有很多很多相关的介绍文章及实现的代码,我最开始也是准备那样子做的,但是用链表来实现要解决几个很麻烦的问题,这就是为什么我是先尝试了用数组来实现的根本原因。现在看来,我明白了为什么网上几乎没有几篇文章是介绍怎么用数组来实现无锁队列的原因了:用数组根本无法实现一个真正意义上的无锁队列。
现在再看看我们之前写的代码,它的确无法真正无锁,但我们还可以对它加以改进让它变得更适用,比如,最基本的一个,允许并发的写,允许并发的读,也就是允许几个线程同时往里面写,又或者允许几线程同时从栈里读,但不允许同时有读写。这是可以做到的,因为只是往栈里写的时候,我们需要竞争一下top,获取一个数组的位置就够了,读也同理,这一个改进会让这个栈的性能有很大的提升,虽然我们还是无法保证它不会死锁。
为了保证能允许多个线程同时读(写),但又要读写互斥,我们需要至少检查两个标记,一个标记记录是否有读在进行,一个标记记录是否有写在进行,这看起来又像是要有两步操作,前面的经验看来,几乎又会是失败的开端。但我从cas2这种操作里获得了一个新思路,我把这两个标记放到一个变量里,那就可以避免要用两步来实现检查两个变量了!
1 //无锁的栈。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 //低位一个字节表示有多少线程在读 10 //低位第二个字节表示多少线程在写 11 static size_t mask = 0; 12 13 bool Push(ELEM val) 14 { 15 int old_top; 16 size_t old_mask; 17 size_t append = 0x10; 18 19 do 20 { 21 old_top = top; 22 if (old_top >= MAX) return false; 23 //TODO 检查正在写的线程的数量,如果超过255就让当前线程等待 24 old_mask = mask & (~0x0f);//低位全为0时,没有线程在读,只有当没有线程读时,才能往栈里写 25 if (!cas(&mask, old_mask, old_mask + append)) continue;//这里可以适当sleep, sched_yield(); 26 27 if (cas(&top, old_top, old_top + 1)) 28 break; 29 30 }while(1); 31 32 Stack[old_top] = val; 33 34 do 35 { 36 old_mask = mask; 37 } while(!cas(&mask, old_mask, old_mask - append)); 38 39 return true; 40 } 41 42 43 bool Pop(ELEM& val) 44 { 45 int old_top; 46 size_t old_mask; 47 size_t append = 0x01; 48 49 do 50 { 51 old_top = top; 52 53 if (old_top == 0) return false; 54 55 old_mask = mask & (~0xf0);//第二个字节为0时,没有线程在写,只有当没有线程在写时,才允许读 56 //TODO, 检查正在读的线程的数量,如果超过255,就等待 57 if (!cas(&mask, old_mask, old_mask + append)) continue; 58 59 val = Stack[old_top - 1]; 60 61 if (cas(&top, old_top, old_top - 1)) 62 break; 63 64 } while(1); 65 66 do 67 { 68 old_mask = mask; 69 } while(!cas(&mask, old_mask, old_mask - append)); 70 71 return true; 72 }
经过上面的优化,这个栈在读写方面的性能有非常大的提升,它的特点是允许一定数量的线程在并发地写(读),已经非常接近我们理想中的无锁栈了,唯一的遗憾是,它没法保证读写的独立性,如果有线程在读,想写数据的线程就得等待,反之亦然,这个缺点使得上面的代码也没法克服不会死锁的缺点,所以也不能用在中断,信号处理里面,十分遗憾。但不管怎样,这算是迈出了一步,也实现了一个还算差强人意的栈,上面的代码或许很简洁,但有很多理论藏在了后面,要实现真正的无锁数据结构不是一件容易的事情,首先就是很难有平台移植性,cas操作与具体的cpu相关,内存模型更是与cpu千丝万缕的关系,其中后一条我这儿只字不提,只是因为我不想让事情变得复杂。
在实现真正的无锁队列的路上,我们还有路要走,接下来我会再介绍一下用链表来实现的思路,至于上面所写的代码,可以到GitHub上获取:https://github.com/kmalloc/server/blob/master/misc/LockFreeContainer.h