zoukankan      html  css  js  c++  java
  • 基于无锁的C#并发队列实现(转载)

    最近开始学习无锁编程,和传统的基于Lock的算法相比,无锁编程具有其独特的优点,Angel Lucifer的关于无锁编程一文对此有详细的描述。

    无锁编程的目标是在不使用Lock的前提下保证并发过程中共享数据的一致性,其主要的实现基础是CAS操作,也就是 compare_and_swap,通过处理器提供的指令,可以原子地更新共享数据,并同时监测其他线程的干扰,.Net中的对应实现是 InterLocked.CompareExchange函数。

    既然不使用Lock,那在无锁编程中要时刻注意的是,代码可能在任意语句中被中断。如果是单个变量,我们可以使用 InterLocked.XXX 保证操作的原子性,但是如果有多个操作要完成的话,简单地组合 InterLocked.XXX 是远远不够的。通常的原则是对函数中用到的共享变量,先在代码开始处用局部变量保存它的内容,在后面更新共享变量时,使用前述变量来判断其是否发生了改 变,如果共享变量发生了改变,那么我们可能需要重试,或者在某些可能的情况下,当前线程可以"帮助"其他更新中的线程完成更新。

    从上面可以总结出无锁算法的两个基本特征:

    1. 无锁算法总是包含一个循环结构,以保证更新失败后重试

    2. 无锁算法在更新共享变量时,总是使用CAS和原始值进行比较,以保证没有冲突

    下面是按照Michael-Scott算法实现的并发队列,其中的Dequeue算法在IBM的非阻塞算法一文中有详细介绍。代码如下:

    public class ConcurrentLinkedQueue<T> 
     2基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页{
     3    private class Node<K>
     4基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页    {
     5        internal K Item;
     6        internal Node<K> Next;
     7
     8        public Node(K item, Node<K> next)
     9基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页        {
    10            this.Item = item;
    11            this.Next = next;
    12        }

    13    }

    14
    15    private Node<T> _head;
    16    private Node<T> _tail;
    17
    18    public ConcurrentLinkedQueue()
    19基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页    {
    20        _head = new Node<T>(default(T), null);
    21        _tail = _head;
    22    }

    23
    24    public bool IsEmpty
    25基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页    {
    26基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页        get return (_head.Next == null); }
    27    }

    28
    29    public void Enqueue(T item)
    30基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页    {
    31        Node<T> newNode = new Node<T>(item, null);
    32        while (true)
    33基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页        {
    34            Node<T> curTail = _tail;
    35            Node<T> residue = curTail.Next;
    36
    37            //判断_tail是否被其他process改变
    38            if (curTail == _tail)
    39基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页            {
    40                //A 有其他process执行C成功,_tail应该指向新的节点
    41                if (residue == null
    42基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                {
    43                    //C 如果其他process改变了tail.next节点,需要重新取新的tail节点
    44                    if (Interlocked.CompareExchange<Node<T>>(
    45                        ref curTail.Next, newNode, residue) == residue) 
    46基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                    {
    47                        //D 尝试修改tail
    48                        Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); 
    49                        return;
    50                    }

    51                }

    52                else
    53基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                {
    54                    //B 帮助其他线程完成D操作
    55                    Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); 
    56                }

    57            }

    58        }

    59    }

    60
    61    public bool TryDequeue(out T result)
    62基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页    {
    63        Node<T> curHead;
    64        Node<T> curTail;
    65        Node<T> next;
    66        do
    67基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页        {
    68            curHead = _head;
    69            curTail = _tail;
    70            next = curHead.Next;
    71            if (curHead == _head)
    72基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页            {
    73                if (next == null)  //Queue为空
    74基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                {
    75                    result = default(T);
    76                    return false;
    77                }

    78                if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
    79基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                {
    80                    //尝试帮助其他Process完成操作
    81                    Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); 
    82                }

    83                else
    84基于无锁的C并发队列实现 - qiuguangchun - sandea的个人主页                {
    85                    //取next.Item必须放到CAS之前
    86                    result = next.Item; 
    87                    //如果_head没有发生改变,则将_head指向next并退出
    88                    if (Interlocked.CompareExchange<Node<T>>(ref _head, 
    89                        next, curHead) == curHead)
    90                        break;
    91                }

    92            }

    93        }

    94        while (true);
    95        return true;
    96    }

    97}

    根据自己的测试(双核CPU),在轻度和中度争用情况 下,无锁算法比基于锁的算法性能好很多,在争用非常严重的情况下(100个并发线程以上/每CPU),基于锁的算法性能开始显示出优势,因为一旦发生争 用,基于锁的算法会立刻切换到其他线程,而无锁算法会进入下一次循环,导致CPU的占用。但是如此严重的争用在实际中并不多见,并且可以采用 SpinWait的方法加以改进。基于锁的算法在测试中曾经出现过类似死锁的现象,无锁算法则完全没有出过类似问题,另外,处理器核心越多,基于锁的算法 效率越差。

     
    从上面的算法实现中,可以体会到无锁算法的优势:在并发的多个线程中,总是有线程能够推进,算法总能在有限的循环次数内完成,并且在某些冲突的情况下,一个线程可以“帮助”其他线程完成被中断的工作,这些对提高吞吐量都有很大的作用。

    98

  • 相关阅读:
    python
    python
    python
    python
    python
    python
    python
    python
    人生苦短,我用python,为什么选择python,python简介
    Mysql-查询
  • 原文地址:https://www.cnblogs.com/sandea/p/3293708.html
Copyright © 2011-2022 走看看