背景
大多数企业开发人员都理解数据库乐观并发控制,不过很少有人听说过 CAS(我去年才听说这个概念),CAS 是多线程乐观并发控制策略的一种,一些无锁的支持并发的数据结构都会使用到 CAS,本文对比 CAS 和 数据库乐观并发控制,以此达到强化记忆的目的。
CAS
CAS = Compare And Swap
多线程环境下 this.i = this.i + 1 是没有办法保证线程安全的,因此就有了 CAS,CAS 可以保证上面代码的线程安全性,但是 CAS 并不会保证 Swap 的成功,只有 Compare 成功了才会 Swap,即:没有并发发生,即:在我读取和修改之间没有别人修改。另外说一点,如果 i 是局部变量,即:i = i + 1,那么这段代码是线程安全的,因为局部变量是线程独享的。
不明白 CAS 没关系,下面通过 CAS 的标准模式 和 一个简单的示例来理解 CAS。
CAS 的标准模式
伪代码
1 var localValue, currentValue; 2 do 3 { 4 localValue = this. 5 6 var newValue = 执行一些计算; 7 8 currentValue = Interlocked.CompareExchange(ref this.value, newValue, localValue); 9 } while (localValue != currentValue);
说明
把 this.value 看成是数据库数据,localValue 是某个用户读取的数据,newValue是用户想修改的值,这里有必要解释一下 CompareExchange 和 currentValue,它的内部实现代码是这样的(想想下面代码是线程安全的):
1 var currentValue = this.value 2 if(currentValue == localValue){ 3 this.value = newValue; 4 } 5 return currentValue;
CompareExchange 用 sql 来类比就是:update xxx set value = newValue where value = localValue,只不过返回的值不同。通过 CompareExchange 的返回结果我们知道 CAS 是否成功了,即:是否出现并发了,即:是否在我读取和修改之间别人已经修改过了,如果是,可以选择重试。
累加示例
CAS 代码
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Threading; 7 8 namespace InterlockStudy 9 { 10 class ConcurrentIncrease 11 { 12 public static void Test() 13 { 14 var sum = 0; 15 16 var tasks = new Task[10]; 17 for (var i = 1; i <= 10; i++) 18 { 19 tasks[i - 1] = Task.Factory.StartNew((state) => 20 { 21 int localSum, currentSum; 22 do 23 { 24 localSum = sum; 25 26 Thread.Sleep(10); 27 var value = (int)state; 28 var newValue = localSum + value; 29 30 currentSum = Interlocked.CompareExchange(ref sum, newValue, localSum); 31 } while (localSum != currentSum); 32 }, i); 33 } 34 35 Task.WaitAll(tasks); 36 37 Console.WriteLine(sum); 38 } 39 } 40 }
数据库并发代码
1 public static void Test13() 2 { 3 var tasks = new Task[10]; 4 for (var i = 1; i <= 10; i++) 5 { 6 tasks[i - 1] = Task.Factory.StartNew((state) => 7 { 8 int localSum, result; 9 do 10 { 11 using (var con = new SqlConnection(CONNECTION_STRING)) 12 { 13 con.Open(); 14 var cmd = new SqlCommand("select sum from Tests where Id = 1", con); 15 var reader = cmd.ExecuteReader(); 16 reader.Read(); 17 localSum = reader.GetInt32(0); 18 19 System.Threading.Thread.Sleep(10); 20 var value = (int)state; 21 var newValue = localSum + value; 22 23 cmd = new SqlCommand("update Tests set sum = " + newValue + " where sum = " + localSum + "", con); 24 result = cmd.ExecuteNonQuery(); 25 } 26 } while (result == 0); 27 }, i); 28 } 29 30 Task.WaitAll(tasks); 31 } 32 }
说明
我们发现 CAS 版本的代码和数据库版本的代码出奇的相似,数据库的CAS操作是通过 update + where 来完成的。
写着玩的 RingBuffer
代码
1 using System; 2 using System.Collections.Generic; 3 using System.Collections.Concurrent; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using System.Threading; 8 9 namespace InterlockStudy 10 { 11 internal class Node<T> 12 { 13 public T Data { get; set; } 14 15 public bool HasValue { get; set; } 16 } 17 18 class RingBuffer<T> 19 { 20 private readonly Node<T>[] _nodes; 21 private long _tailIndex = -1; 22 private long _headIndex = -1; 23 private AutoResetEvent _readEvent = new AutoResetEvent(false); 24 private AutoResetEvent _writeEvent = new AutoResetEvent(false); 25 26 public RingBuffer(int maxSize) 27 { 28 _nodes = new Node<T>[maxSize]; 29 30 for (var i = 0; i < maxSize; i++) 31 { 32 _nodes[i] = new Node<T>(); 33 } 34 } 35 36 public void EnQueue(T data) 37 { 38 while (true) 39 { 40 if (this.TryEnQueue(data)) 41 { 42 _readEvent.Set(); 43 return; 44 } 45 46 _writeEvent.WaitOne(); 47 } 48 49 } 50 51 public T DeQueue() 52 { 53 while (true) 54 { 55 T data; 56 if (this.TryDeQueue(out data)) 57 { 58 _writeEvent.Set(); 59 return data; 60 } 61 62 _readEvent.WaitOne(); 63 } 64 65 } 66 67 public bool TryEnQueue(T data) 68 { 69 long localTailIndex, newTailIndex, currentTailIndex; 70 do 71 { 72 localTailIndex = _tailIndex; 73 74 if (!this.CanWrite(localTailIndex)) 75 { 76 return false; 77 } 78 79 newTailIndex = localTailIndex + 1; 80 81 if (_nodes[newTailIndex % _nodes.Length].HasValue) 82 { 83 return false; 84 } 85 86 currentTailIndex = Interlocked.CompareExchange(ref _tailIndex, newTailIndex, localTailIndex); 87 } 88 while (localTailIndex != currentTailIndex); 89 90 _nodes[newTailIndex % _nodes.Length].Data = data; 91 _nodes[newTailIndex % _nodes.Length].HasValue = true; 92 93 return true; 94 } 95 96 public bool TryDeQueue(out T data) 97 { 98 long localHeadIndex, newHeadIndex, currentHeadIndex; 99 do 100 { 101 localHeadIndex = _headIndex; 102 103 if (!this.CanRead(localHeadIndex)) 104 { 105 data = default(T); 106 return false; 107 } 108 109 newHeadIndex = localHeadIndex + 1; 110 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false) 111 { 112 data = default(T); 113 return false; 114 } 115 116 currentHeadIndex = Interlocked.CompareExchange(ref _headIndex, newHeadIndex, localHeadIndex); 117 } 118 while (localHeadIndex != currentHeadIndex); 119 120 data = _nodes[newHeadIndex % _nodes.Length].Data; 121 _nodes[newHeadIndex % _nodes.Length].HasValue = false; 122 123 return true; 124 } 125 126 private bool CanWrite(long localTailIndex) 127 { 128 return localTailIndex - _headIndex < _nodes.Length; 129 } 130 131 private bool CanRead(long localHeadIndex) 132 { 133 return _tailIndex - localHeadIndex > 0; 134 } 135 } 136 }
备注
仓促成文,如果有必要可以再写篇文章,希望大家多批评。