zoukankan      html  css  js  c++  java
  • C#编程(五十八)----------并行集合

    并行集合

    对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问.经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作. .NET4提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度.

    并行集合的命名空间:System.Collections.Concurrent

    并行容器:

    ConcurrentQueue

    ConcurrentStack

    ConcurrentBag: 一个无序的数据结构集,当不考虑顺序时非常有用.

    BlockingCollection:与经典的阻塞队列数据结构类似

    ConcurrentDictoinary

    以上这些集合在某种程度上使用了无锁技术(CAS和内存屏蔽),与加互斥锁相比获得了性能的提升.但是在串行程序中,最好不要使用这些集合,他们必然会影响性能.

    ConcurrentQueue用法与实例

    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.

    Enqueue:在队尾插入元素

    TryDequeue:尝试删除对头元素,并通过out参数返回

    TryPeek:尝试将对头元素通过out参数返回,但不删除元素

    案例:

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using System.Collections.Concurrent;

    namespace 并行结合Queue

    {

        class Program

        {

            internal static ConcurrentQueue<int> _TestQueue;

            class ThreadWork1 //生产者

            {

                public ThreadWork1()

                { }

                public void run()

                {

                    Console.WriteLine("ThreadWork1 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        Console.WriteLine("ThreadWork1 producer: "+i);

                        _TestQueue.Enqueue(i);

                    }

                    Console.WriteLine("ThreadWork1 run } ");

                }

            }

            class ThreadWork2 //consumer

            {

                public ThreadWork2()

                { }

                public void run()

                {

                    int i = 0;

                    bool IsDequeue = false;

                    Console.WriteLine("ThreadWork2 run { ");

                    for (; ; )

                    {

                        IsDequeue = _TestQueue.TryDequeue(out i);

                        if (IsDequeue)

                        {

                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   ====="); 

                        }

                        if (i==99)

                        {

                            break;

                        }                    

                    }

                    Console.WriteLine("ThreadWork2 run } ");

                }

            }

            static void StartT1()

            {

                ThreadWork1 work1 = new ThreadWork1();

                work1.run();

            }

            static void StartT2()

            {

                ThreadWork2 work2 = new ThreadWork2();

                work2.run();

            }  

            static void Main(string[] args)

            {

                Task t1 = new Task(() => StartT1());

                Task t2 = new Task(() => StartT2());

                _TestQueue = new ConcurrentQueue<int>();

                Console.WriteLine("Sample 3-1 Main {");

                Console.WriteLine("Main t1 t2 started {");

                t1.Start();

                t2.Start();

                Console.WriteLine("Main t1 t2 started }");

                Console.WriteLine("Main wait t1 t2 end {");

                Task.WaitAll(t1, t2);

                Console.WriteLine("Main wait t1 t2 end }");

                Console.WriteLine("Sample 3-1 Main }");

                Console.ReadKey();  

            }

        }

    }

    ConcurrentStact

    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.

    Push:向栈顶插入元素

    TryPop:从栈顶弹出元素,并且通过out参数返回

    TryPeek:返回栈顶元素,但不弹出

    案例:

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using System.Collections.Concurrent;

    namespace 并行结合Queue

    {

        class Program

        {

            internal static ConcurrentStack<int> _TestStack;

            class ThreadWork1 //生产者

            {

                public ThreadWork1()

                { }

                public void run()

                {

                    Console.WriteLine("ThreadWork1 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        Console.WriteLine("ThreadWork1 producer: "+i);

                        _TestStack.Push(i);

                    }

                    Console.WriteLine("ThreadWork1 run } ");

                }

            }

            class ThreadWork2 //consumer

            {

                public ThreadWork2()

                { }

                public void run()

                {

                    int i = 0;

                    bool IsDequeue = false;

                    Console.WriteLine("ThreadWork2 run { ");

                    for (; ; )

                    {

                        IsDequeue = _TestStack.TryPop(out i);

                        if (IsDequeue)

                        {

                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i); 

                        }

                        if (i==99)

                        {

                            break;

                        }                    

                    }

                    Console.WriteLine("ThreadWork2 run } ");

                }

            }

            static void StartT1()

            {

                ThreadWork1 work1 = new ThreadWork1();

                work1.run();

            }

            static void StartT2()

            {

                ThreadWork2 work2 = new ThreadWork2();

                work2.run();

            }  

            static void Main(string[] args)

            {

                Task t1 = new Task(() => StartT1());

                Task t2 = new Task(() => StartT2());

                _TestStack = new ConcurrentStack<int>();

                Console.WriteLine("Sample 4-1 Main {");

                Console.WriteLine("Main t1 t2 started {");

                t1.Start();

                t2.Start();

                Console.WriteLine("Main t1 t2 started }");

                Console.WriteLine("Main wait t1 t2 end {");

                Task.WaitAll(t1, t2);

                Console.WriteLine("Main wait t1 t2 end }");

                Console.WriteLine("Sample 4-1 Main }");

                Console.ReadKey();  

            }

        }

    }

    ConcurrentBag

    一个无序的集合,程序可以向其中插入元素,或删除元素.

    在同一个线程中向集合插入,删除元素效率很高.

    Add:向集合中插入元素

    TryTake:从集合中取出元素并删除

    TryPeek:从集合中取出元素,但不删除元素

    案例:

    using System;

    using System.Collections.Concurrent;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    namespace 并行集合ConcurrentBag

    {

        class Program

        {

            internal static ConcurrentBag<int> _TestBag;

            class ThreadWork1 //producer

            {

                public ThreadWork1()

                { }

                public void run()

                {

                    Console.WriteLine("Threadwork1 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        Console.WriteLine("ThreadWork1 producer: "+i);

                        _TestBag.Add(i);

                    }

                    Console.WriteLine("ThreadWork1 run } ");

                }

            }

            class ThreadWork2//consumer

            {

                public ThreadWork2()

                { }

                public void run()

                {

                    bool IsDequeue = false;

                    Console.WriteLine("ThreadWork2 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        IsDequeue = _TestBag.TryTake(out i);

                        if (IsDequeue)

                        {

                            Console.WriteLine("ThreadWork2 consumer: " + i * i + "======" + i);

                        }

                    }

                    Console.WriteLine("ThreadWork2 run } ");

                }

            }

            static void Start1()

            {

                ThreadWork1 work1 = new ThreadWork1();

                work1.run();

            }

            static void Start2()

            {

                ThreadWork2 work2 = new ThreadWork2();

                work2.run();

            }

            static void Main(string[] args)

            {

                Task t1 = new Task(() => Start1());

                Task t2 = new Task(() => Start2());

                _TestBag = new ConcurrentBag<int>();

                t1.Start();

                t2.Start();

                Console.WriteLine("Main t1 t2 started }");

                Console.WriteLine("Main wait t1 t2 end {");

                Task.WaitAll(t1, t2);

                Console.WriteLine("Main wait t1 t2 end }");

                Console.WriteLine("Sample 4-3 Main }");

                Console.ReadKey();  

            }

        }

    }

    BlockingCollection

    一个支持界限和阻塞的容器

    Add:向容器中插入元素

    TryTake:从容器中取出元素并删除

    TryPeek:从容器中取出元素,但不删除

    CompleteAdding:告诉容器,添加元素完成.此时如果还想继续添加会发生异常.

    IsCompleted:告诉消费者线程,产生者线程还在继续运行中,任务还未完成.

    案例:

    程序中,消费者线程完全使用While(!__testCollection.IsCompleted)作为退出运行的判断条件.Work1,有两条语句被注释了,i50时设置为CompleteAdding,但当继续向其中插入元素时,系统抛出异常,提示无法再继续插入.

    案例:

    using System;

    using System.Collections.Concurrent;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    namespace 并行结合Collection

    {

        class Program

        {

            internal static BlockingCollection<int> _testBCollection;

            class ThreadWork1 //producer

            {

                public ThreadWork1()

                { }

                public void run()

                {

                    Console.WriteLine("ThreadWork1 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        Console.WriteLine("ThreadWork1 producer: "+i);

                        _testBCollection.Add(i);

                        //i50时设置为CompleteAdding,但当继续向其中插入元素时,系统抛出异常,提示无法再继续插入.

                        //if (i==50)

                        //{

                        //    _testBCollection.CompleteAdding();

                        //}

                    }

                    _testBCollection.CompleteAdding();

                    Console.WriteLine("ThreadWork1 run } ");

                }

            }

            class ThreadWork2//consumer

            {

                public ThreadWork2()

                { }

                public void run()

                {

                    int i = 0;

                    

                    bool IsDequeue = false;

                    Console.WriteLine("ThreadWork2 run { ");

                    while (!_testBCollection.IsCompleted)

                    {

                        //不明白这里为啥i会自动的++

                        //Console.WriteLine("i=================="+i);

                        IsDequeue = _testBCollection.TryTake(out i);

                        if (IsDequeue)

                        {

                            Console.WriteLine("ThreadWork2 consumer: "+i*i+"====="+i);

                           //i++;这句话不加还是会出现自动++的操作

                        }

                    }

                }

            }

            static void StartT1()

            {

                ThreadWork1 work1 = new ThreadWork1();

                work1.run();

            }

            static void StartT2()

            {

                ThreadWork2 work2 = new ThreadWork2();

                work2.run();

            }  

            static void Main(string[] args)

            {

                Task t1 = new Task(() => StartT1());

                Task t2 = new Task(() => StartT2());

                _testBCollection = new BlockingCollection<int>();

                Console.WriteLine("Sample 4-4 Main {");

                Console.WriteLine("Main t1 t2 started {");

                t1.Start();

                t2.Start();

                Console.WriteLine("Main t1 t2 started }");

                Console.WriteLine("Main wait t1 t2 end {");

                Task.WaitAll(t1, t2);

                Console.WriteLine("Main wait t1 t2 end }");

                Console.WriteLine("Sample 4-4 Main }");

                Console.ReadKey();  

            }

        }

    }

    分析://_testBCollection.CompleteAdding();//这句话注释掉work2陷入循环,无法退出

    ConcurrentDictionary

    对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁.

    AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值

    GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值.

    TryAdd:尝试在容器中添加新的键和值

    TryGetValue:尝试根据指定的键获得值

    TryUpdate:有条件的更新当前键所对应的值

    TryRemove:尝试删除指定的键.

    Getenumerator:返回一个能够遍历整个容器的枚举器.

    案例:

    using System;

    using System.Collections.Concurrent;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    namespace 并行集合Dictionary

    {

        class Program

        {

            internal static ConcurrentDictionary<int, int> _TestDictionary;

            class ThreadWork1//producer

            {

                public ThreadWork1()

                { }

                public void run()

                {

                    System.Console.WriteLine("ThreadWork1 run { ");

                    for (int i = 0; i < 100; i++)

                    {

                        System.Console.WriteLine("ThreadWork1 producer: " + i);

                        _TestDictionary.TryAdd(i, i);

                    }

                    System.Console.WriteLine("ThreadWork1 run } ");

                }

            }

            class ThreadWork2//consumer

            {

                public ThreadWork2()

                { }

                public void run()

                {

                    bool IsOk = false;

                    Console.WriteLine("ThreadWork2 run { ");

                    int nCnt = 0,i=0,nValue=0;

                    while (nCnt<100)

                    {

                        IsOk = _TestDictionary.TryGetValue(i,out nValue);

                        if (IsOk)

                        {

                            Console.WriteLine("ThreadWork2 consumer: "+i*i+"====="+i);

                            nValue *= nValue;

                            nCnt++;

                            i++;

                        }

                    }

                    Console.WriteLine("ThreadWork2 run } ");

                }

            }

            static void StartT1()

            {

                ThreadWork1 work1 = new ThreadWork1();

                work1.run();

            }

            static void StartT2()

            {

                ThreadWork2 work2 = new ThreadWork2();

                work2.run();

            }  

            static void Main(string[] args)

            {

                Task t1 = new Task(() => StartT1());

                Task t2 = new Task(() => StartT2());

                bool bIsNext = true;

                int nValue = 0;

                _TestDictionary = new ConcurrentDictionary<int, int>();

                Console.WriteLine("Sample 4-5 Main {");

                Console.WriteLine("Main t1 t2 started {");

                t1.Start();

                t2.Start();

                Console.WriteLine("Main t1 t2 started }");

                Console.WriteLine("Main wait t1 t2 end {");

                Task.WaitAll(t1, t2);

                Console.WriteLine("Main wait t1 t2 end }");

                foreach (var pair in _TestDictionary)

                {

                    Console.WriteLine(pair.Key + " : " + pair.Value);

                }

                IEnumerator<KeyValuePair<int, int>> enumer = _TestDictionary.GetEnumerator();

                while (bIsNext)

                {

                    bIsNext = enumer.MoveNext();

                    Console.WriteLine("Key: " + enumer.Current.Key +"  Value: " + enumer.Current.Value);

                    _TestDictionary.TryRemove(enumer.Current.Key, out nValue);  

                }

                Console.WriteLine(" Dictionary Count: " + _TestDictionary.Count);

                Console.WriteLine("Sample 4-5 Main }");

                Console.ReadKey();  

            }

        }

    }

    总结说明: .NET4包含的新命名空间System.Collection,Concurrent有几个线程安全的集合类.线程安全的集合可防止多个线程以相互冲突的方式访问集合.

    为了对集合进行线程安全的访问,定义了IPriducerConsumerCollection<T>接口.这个接口中最重要的方法是TryAdd()TryTake().TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作可能失败.为了给出相关信息,TryAdd()方法返回一个布尔值,以说明操作是成功还是失败.TryTake()方法也以这种方式工作,以通过调用者操作是成功还是失败,并在操作成功时返回集合中的项.

  • 相关阅读:
    安装MySQLdb
    树莓派及其他硬件平台国内外Linux镜像站全汇总
    rpc使用举例
    SAE上安装第三方模块
    【Java】Map
    【Java】判断字符串是否含字母
    【Android Studio】提示代码忽略大小写
    【iOS】Xcode 离线文档
    【iOS】iOS main() 简介
    【eclipse】No enclosing instance of type A is accessible. Must qualify the allocation with an enclosing instance of type A
  • 原文地址:https://www.cnblogs.com/android-blogs/p/6610078.html
Copyright © 2011-2022 走看看