zoukankan      html  css  js  c++  java
  • C# 线程安全集合

    转载
    
    对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问。经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作。 .NetFramework 4.0 中提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度。
    
    
    
    
    基本信息
    
    .NetFramework中并行集合的名字空间: System.Collections.Concurrent
    
    并行容器:
    
    •ConcurrentQueue
    •ConcurrentStack
    •ConcurrentBag : 一个无序的数据结构集,当不需要考虑顺序时非常有用。
    •BlockingCollection : 与经典的阻塞队列数据结构类似
    •ConcurrentDictionary
    
    
    
    
    这些集合在某种程度上使用了无锁技术(CAS Compare-and-Swap和内存屏障 Memory Barrier),与加互斥锁相比获得了性能的提升。但在串行程序中,最好不用这些集合,它们必然会影响性能。
    
    
    
    
    关于CAS: 
    
    •http://www.tuicool.com/articles/zuui6z
    •http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml
    
    关于内存屏障
    
    •http://en.wikipedia.org/wiki/Memory_barrier
    
    
    
    
    用法与示例
    
    ConcurrentQueue
    
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。
    
    
    
    
    •Enqueue:在队尾插入元素
    •TryDequeue:尝试删除队头元素,并通过out参数返回
    •TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_1_concurrent_queue
    {
        class Program
        {
            internal static ConcurrentQueue<int> _TestQueue;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestQueue.Enqueue(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (; ; )
                    {
                        IsDequeuue = _TestQueue.TryDequeue(out i);
                        if (IsDequeuue)
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====");
    
                        if (i == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestQueue = new ConcurrentQueue<int>();
    
                Console.WriteLine("Sample 3-1 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 3-1 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    
    
    
    ConcurrentStack
    
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。
    
    
    
    
    
    
    •Push:向栈顶插入元素
    •TryPop:从栈顶弹出元素,并且通过out 参数返回
    •TryPeek:返回栈顶元素,但不弹出。
    
    
    
    
    程序示例:
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_2_concurrent_stack
    {
        class Program
        {
            internal static ConcurrentStack<int> _TestStack;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestStack.Push(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (; ; )
                    {
                        IsDequeuue = _TestStack.TryPop(out i);
                        if (IsDequeuue)
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
    
                        if (i == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestStack = new ConcurrentStack<int>();
    
                Console.WriteLine("Sample 4-1 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-1 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    测试中一个有趣的现象:
    
    
    
    
    虽然生产者已经在栈中插入值已经到了25,但消费者第一个出栈的居然是4,而不是25。很像是出错了。但仔细想想入栈,出栈和打印语句是两个部分,而且并不是原子操作,出现这种现象应该也算正常。
    
    
    
    Sample 3-1 Main {
    Main t1 t2 started {
    Main t1 t2 started }
    Main wait t1 t2 end {
    ThreadWork1 run {
    ThreadWork1 producer: 0
     ThreadWork2 run {
    ThreadWork1 producer: 1
    ThreadWork1 producer: 2
     ThreadWork1 producer: 3
    ThreadWork1 producer: 4
    ThreadWork1 producer: 5
    ThreadWork1 producer: 6
    ThreadWork1 producer: 7
    ThreadWork1 producer: 8
    ThreadWork1 producer: 9
    ThreadWork1 producer: 10
    ThreadWork1 producer: 11
    ThreadWork1 producer: 12
    ThreadWork1 producer: 13
     ThreadWork1 producer: 14
    ThreadWork1 producer: 15
    ThreadWork1 producer: 16
    ThreadWork1 producer: 17
    ThreadWork1 producer: 18
    ThreadWork1 producer: 19
    ThreadWork1 producer: 20
    ThreadWork1 producer: 21
     ThreadWork1 producer: 22
    ThreadWork1 producer: 23
    ThreadWork1 producer: 24
    ThreadWork1 producer: 25
    ThreadWork2 consumer: 16   =====4
    ThreadWork2 consumer: 625   =====25
    ThreadWork2 consumer: 576   =====24
    ThreadWork2 consumer: 529   =====23
    ThreadWork1 producer: 26
    ThreadWork1 producer: 27
     ThreadWork1 producer: 28
    
    
    
    
    
    
    
    
    
    
    ConcurrentBag
    
    一个无序的集合,程序可以向其中插入元素,或删除元素。
    
    在同一个线程中向集合插入,删除元素的效率很高。
    
    
    
    
    • Add:向集合中插入元素
    • TryTake:从集合中取出元素并删除
    • TryPeek:从集合中取出元素,但不删除该元素。
    
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    namespace Sample4_3_concurrent_bag
    {
        class Program
        {
            internal static ConcurrentBag<int> _TestBag;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestBag.Add(i);
                    }
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    int nCnt = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    for (;;)
                    {
                        IsDequeuue = _TestBag.TryTake(out i);
                        if (IsDequeuue)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nCnt++;
                        }
    
                        if (nCnt == 99)
                            break;
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestBag = new ConcurrentBag<int>();
    
                Console.WriteLine("Sample 4-3 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-3 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    
    
    
    BlockingCollection
    
    一个支持界限和阻塞的容器
    
    
    
    
    •Add :向容器中插入元素
    •TryTake:从容器中取出元素并删除
    •TryPeek:从容器中取出元素,但不删除。
    •CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。
    •IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。
    
    
    
    
    示例程序:
    
    
    
    
    程序中,消费者线程完全使用  while (!_TestBCollection.IsCompleted) 作为退出运行的判断条件。
    
    在Worker1中,有两条语句被注释掉了,当i 为50时设置CompleteAdding,但当继续向其中插入元素时,系统抛出异常,提示无法再继续插入。
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    
    namespace Sample4_4_concurrent_bag
    {
        class Program
        {
            internal static BlockingCollection<int> _TestBCollection;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestBCollection.Add(i);
                        //if (i == 50)
                        //    _TestBCollection.CompleteAdding();
                    }
                    _TestBCollection.CompleteAdding();
    
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0;
                    int nCnt = 0;
                    bool IsDequeuue = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    while (!_TestBCollection.IsCompleted)
                    {
                        IsDequeuue = _TestBCollection.TryTake(out i);
                        if (IsDequeuue)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nCnt++;
                        }
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
    
                _TestBCollection = new BlockingCollection<int>();
    
                Console.WriteLine("Sample 4-4 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                Console.WriteLine("Sample 4-4 Main }");
    
                Console.ReadKey();
            }
        }
    }
    
    
    
    
    当然可以尝试在Work1中注释掉 CompleteAdding 语句,此时Work2陷入循环无法退出。
    
    
    
    
    ConcurrentDictionary
    
    对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。
    
    
    
    
    •AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。
    •GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。
    •TryAdd:尝试在容器中添加新的键和值。
    •TryGetValue:尝试根据指定的键获得值。
    •TryRemove:尝试删除指定的键。
    •TryUpdate:有条件的更新当前键所对应的值。
    •GetEnumerator:返回一个能够遍历整个容器的枚举器。
    
    
    
    
    
    
    
    
    程序示例:
    
    
    
    
    using System;
    using System.Text;
    
    using System.Threading.Tasks;
    using System.Collections.Concurrent;
    
    
    namespace Sample4_5_concurrent_dictionary
    {
        class Program
        {
            internal static ConcurrentDictionary<int, int> _TestDictionary;
    
            class ThreadWork1  // producer
            {
                public ThreadWork1()
                { }
    
                public void run()
                {
                    System.Console.WriteLine("ThreadWork1 run { ");
                    for (int i = 0; i < 100; i++)
                    {
                        System.Console.WriteLine("ThreadWork1 producer: " + i);
                        _TestDictionary.TryAdd(i, i);
                    }
    
                    System.Console.WriteLine("ThreadWork1 run } ");
                }
            }
    
            class ThreadWork2  // consumer
            {
                public ThreadWork2()
                { }
    
                public void run()
                {
                    int i = 0, nCnt = 0;
                    int nValue = 0;
                    bool IsOk = false;
                    System.Console.WriteLine("ThreadWork2 run { ");
                    while (nCnt < 100)
                    {
                        IsOk = _TestDictionary.TryGetValue(i, out nValue);
                        if (IsOk)
                        {
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                            nValue = nValue * nValue;
                            _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return value = nValue; });
                            nCnt++;
                            i++;
                        }
                    }
                    System.Console.WriteLine("ThreadWork2 run } ");
                }
            }
    
            static void StartT1()
            {
                ThreadWork1 work1 = new ThreadWork1();
                work1.run();
            }
    
            static void StartT2()
            {
                ThreadWork2 work2 = new ThreadWork2();
                work2.run();
            }
            static void Main(string[] args)
            {
                Task t1 = new Task(() => StartT1());
                Task t2 = new Task(() => StartT2());
                bool bIsNext = true;
                int  nValue = 0;
    
                _TestDictionary = new ConcurrentDictionary<int, int>();
    
                Console.WriteLine("Sample 4-5 Main {");
    
                Console.WriteLine("Main t1 t2 started {");
                t1.Start();
                t2.Start();
                Console.WriteLine("Main t1 t2 started }");
    
                Console.WriteLine("Main wait t1 t2 end {");
                Task.WaitAll(t1, t2);
                Console.WriteLine("Main wait t1 t2 end }");
    
                foreach (var pair in _TestDictionary)
                {
                    Console.WriteLine(pair.Key + " : " + pair.Value);
                }
    
                System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> 
                    enumer = _TestDictionary.GetEnumerator();
    
                while (bIsNext)
                {
                    bIsNext = enumer.MoveNext();
                    Console.WriteLine("Key: " + enumer.Current.Key +
                                      "  Value: " + enumer.Current.Value);
    
                    _TestDictionary.TryRemove(enumer.Current.Key, out nValue);
                }
    
                Console.WriteLine("
    
    Dictionary Count: " + _TestDictionary.Count);
    
                Console.WriteLine("Sample 4-5 Main }");
    
                Console.ReadKey();
            }
        }
    }
  • 相关阅读:
    【Objective-C 篇】 ☞ 9. 协议
    【Objective-C 篇】 ☞ 8. block
    【Objective-C 篇】 ☞ 7. Category、Extension
    【Objective-C 篇】 ☞ 6. 封装、继承、组合与聚合、多态
    【Objective-C 篇】 ☞ 5. MRC、ARC
    【Objective-C 篇】 ☞ 4. 内存管理
    【Objective-C 篇】 ☞ 3. self、数据类型、编码规范
    【Objective-C 篇】 ☞ 2. 属性、方法
    【Objective-C 篇】 ☞ 1. 基础、语法
    【Objective-C 篇】 ☞ 学前准备
  • 原文地址:https://www.cnblogs.com/nanfei/p/6762434.html
Copyright © 2011-2022 走看看