zoukankan      html  css  js  c++  java
  • C# 线程安全集合

    转载
    
    对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问。经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作。 .NetFramework 4.0 中提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度。
    
    
    
    
    基本信息
    
    .NetFramework中并行集合的名字空间: System.Collections.Concurrent
    
    并行容器:
    
    •ConcurrentQueue
    •ConcurrentStack
    •ConcurrentBag : 一个无序的数据结构集,当不需要考虑顺序时非常有用。
    •BlockingCollection : 与经典的阻塞队列数据结构类似
    •ConcurrentDictionary
    
    
    
    
    这些集合在某种程度上使用了无锁技术(CAS Compare-and-Swap和内存屏障 Memory Barrier),与加互斥锁相比获得了性能的提升。但在串行程序中,最好不用这些集合,它们必然会影响性能。
    
    
    
    
    关于CAS: 
    
    •http://www.tuicool.com/articles/zuui6z
    •http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml
    
    关于内存屏障
    
    •http://en.wikipedia.org/wiki/Memory_barrier
    
    
    
    
    用法与示例
    
    ConcurrentQueue
    
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。
    
    
    
    
    •Enqueue:在队尾插入元素
    •TryDequeue:尝试删除队头元素,并通过out参数返回
    •TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_1_concurrent_queue
    {
        class Program
        {
            internal static ConcurrentQueue<int> _TestQueue;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestQueue.Enqueue(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (; ; )
                    {
                        IsDequeuue = _TestQueue.TryDequeue(out i);
                        if (IsDequeuue)
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====");
    
                        if (i == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestQueue = new ConcurrentQueue<int>();
    
                Console.WriteLine("Sample 3-1 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 3-1 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    
    
    
    ConcurrentStack
    
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。
    
    
    
    
    
    
    •Push:向栈顶插入元素
    •TryPop:从栈顶弹出元素,并且通过out 参数返回
    •TryPeek:返回栈顶元素,但不弹出。
    
    
    
    
    程序示例:
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_2_concurrent_stack
    {
        class Program
        {
            internal static ConcurrentStack<int> _TestStack;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestStack.Push(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (; ; )
                    {
                        IsDequeuue = _TestStack.TryPop(out i);
                        if (IsDequeuue)
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
    
                        if (i == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestStack = new ConcurrentStack<int>();
    
                Console.WriteLine("Sample 4-1 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-1 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    测试中一个有趣的现象:
    
    
    
    
    虽然生产者已经在栈中插入值已经到了25,但消费者第一个出栈的居然是4,而不是25。很像是出错了。但仔细想想入栈,出栈和打印语句是两个部分,而且并不是原子操作,出现这种现象应该也算正常。
    
    
    
    Sample 3-1 Main {
    Main t1 t2 started {
    Main t1 t2 started }
    Main wait t1 t2 end {
    ThreadWork1 run {
    ThreadWork1 producer: 0
     ThreadWork2 run {
    ThreadWork1 producer: 1
    ThreadWork1 producer: 2
     ThreadWork1 producer: 3
    ThreadWork1 producer: 4
    ThreadWork1 producer: 5
    ThreadWork1 producer: 6
    ThreadWork1 producer: 7
    ThreadWork1 producer: 8
    ThreadWork1 producer: 9
    ThreadWork1 producer: 10
    ThreadWork1 producer: 11
    ThreadWork1 producer: 12
    ThreadWork1 producer: 13
     ThreadWork1 producer: 14
    ThreadWork1 producer: 15
    ThreadWork1 producer: 16
    ThreadWork1 producer: 17
    ThreadWork1 producer: 18
    ThreadWork1 producer: 19
    ThreadWork1 producer: 20
    ThreadWork1 producer: 21
     ThreadWork1 producer: 22
    ThreadWork1 producer: 23
    ThreadWork1 producer: 24
    ThreadWork1 producer: 25
    ThreadWork2 consumer: 16   =====4
    ThreadWork2 consumer: 625   =====25
    ThreadWork2 consumer: 576   =====24
    ThreadWork2 consumer: 529   =====23
    ThreadWork1 producer: 26
    ThreadWork1 producer: 27
     ThreadWork1 producer: 28
    
    
    
    
    
    
    
    
    
    
    ConcurrentBag
    
    一个无序的集合,程序可以向其中插入元素,或删除元素。
    
    在同一个线程中向集合插入,删除元素的效率很高。
    
    
    
    
    • Add:向集合中插入元素
    • TryTake:从集合中取出元素并删除
    • TryPeek:从集合中取出元素,但不删除该元素。
    
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_3_concurrent_bag
    {
        class Program
        {
            internal static ConcurrentBag<int> _TestBag;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestBag.Add(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    int nCnt = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (;;)
                    {
                        IsDequeuue = _TestBag.TryTake(out i);
                        if (IsDequeuue)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nCnt++;
                        }
    
                        if (nCnt == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestBag = new ConcurrentBag<int>();
    
                Console.WriteLine("Sample 4-3 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-3 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    
    
    
    BlockingCollection
    
    一个支持界限和阻塞的容器
    
    
    
    
    •Add :向容器中插入元素
    •TryTake:从容器中取出元素并删除
    •TryPeek:从容器中取出元素,但不删除。
    •CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。
    •IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。
    
    
    
    
    示例程序:
    
    
    
    
    程序中,消费者线程完全使用  while (!_TestBCollection.IsCompleted) 作为退出运行的判断条件。
    
    在Worker1中,有两条语句被注释掉了,当i 为50时设置CompleteAdding,但当继续向其中插入元素时,系统抛出异常,提示无法再继续插入。
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    
    namespace Sample4_4_concurrent_bag
    {
        class Program
        {
            internal static BlockingCollection<int> _TestBCollection;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestBCollection.Add(i);
                        //if (i == 50)
                        //    _TestBCollection.CompleteAdding();
                    }
                    _TestBCollection.CompleteAdding();
    
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    int nCnt = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    while (!_TestBCollection.IsCompleted)
                    {
                        IsDequeuue = _TestBCollection.TryTake(out i);
                        if (IsDequeuue)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nCnt++;
                        }
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestBCollection = new BlockingCollection<int>();
    
                Console.WriteLine("Sample 4-4 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-4 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    当然可以尝试在Work1中注释掉 CompleteAdding 语句,此时Work2陷入循环无法退出。
    
    
    
    
    ConcurrentDictionary
    
    对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。
    
    
    
    
    •AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。
    •GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。
    •TryAdd:尝试在容器中添加新的键和值。
    •TryGetValue:尝试根据指定的键获得值。
    •TryRemove:尝试删除指定的键。
    •TryUpdate:有条件的更新当前键所对应的值。
    •GetEnumerator:返回一个能够遍历整个容器的枚举器。
    
    
    
    
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    
    namespace Sample4_5_concurrent_dictionary
    {
        class Program
        {
            internal static ConcurrentDictionary<int, int> _TestDictionary;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestDictionary.TryAdd(i, i);
                    }
    
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0, nCnt = 0;
                    int nValue = 0;
                    bool IsOk = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    while (nCnt < 100)
                    {
                        IsOk = _TestDictionary.TryGetValue(i, out nValue);
                        if (IsOk)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nValue = nValue * nValue;
                            _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return value = nValue; });
                            nCnt++;
                            i++;
                        }
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
                bool bIsNext = true;
                int  nValue = 0;
    
                _TestDictionary = new ConcurrentDictionary<int, int>();
    
                Console.WriteLine("Sample 4-5 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                foreach (var pair in _TestDictionary)
                {
                    Console.WriteLine(pair.Key + " : " + pair.Value);
                }
    
                System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> 
                    enumer = _TestDictionary.GetEnumerator();
    
                while (bIsNext)
                {
                    bIsNext = enumer.MoveNext();
                    Console.WriteLine("Key: " + enumer.Current.Key +
                                      "  Value: " + enumer.Current.Value);
    
                    _TestDictionary.TryRemove(enumer.Current.Key, out nValue);
                }
    
                Console.WriteLine("
    
    Dictionary Count: " + _TestDictionary.Count);
    
                Console.WriteLine("Sample 4-5 Main }");
    
                Console.ReadKey();
            }
        }
    }
  • 相关阅读:
    [saiku] 系统登录成功后查询Cubes
    216. Combination Sum III
    215. Kth Largest Element in an Array
    214. Shortest Palindrome
    213. House Robber II
    212. Word Search II
    211. Add and Search Word
    210. Course Schedule II
    分硬币问题
    开始学习Python
  • 原文地址:https://www.cnblogs.com/nanfei/p/6762434.html
Copyright © 2011-2022 走看看