zoukankan      html  css  js  c++  java
  • C#并行编程-并发集合

    菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。

    目录

    背景

    基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。

    在.NET Framework 4 以前,为了让共享的数组、列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作。

    如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。

    System.Collenctions和System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。

    下面看代码,代码中并没有实现线程安全和串行化:

        class Program
        {
            private static object o = new object();
            private static List<Product> _Products { get; set; }
            /*  coder:释迦苦僧  
             *  代码中 创建三个并发线程 来操作_Products 集合
             *  System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
             */
            static void Main(string[] args)
            {
                _Products = new List<Product>();
                /*创建任务 t1  t1 执行 数据集合添加操作*/
                Task t1 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t2  t2 执行 数据集合添加操作*/
                Task t2 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t3  t3 执行 数据集合添加操作*/
                Task t3 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                Task.WaitAll(t1, t2, t3);
                Console.WriteLine(_Products.Count);
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddProducts()
            {
                Parallel.For(0, 1000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    _Products.Add(product);
                });
    
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下:
     

    为此我们需要采用Lock关键字,来确保每次只有一个线程来访问  _Products.Add(product); 这个方法,代码如下:

        class Program
        {
            private static object o = new object();
            private static List<Product> _Products { get; set; }
            /*  coder:释迦苦僧  
             *  代码中 创建三个并发线程 来操作_Products 集合
             *  System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
             */
            static void Main(string[] args)
            {
                _Products = new List<Product>();
                /*创建任务 t1  t1 执行 数据集合添加操作*/
                Task t1 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t2  t2 执行 数据集合添加操作*/
                Task t2 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t3  t3 执行 数据集合添加操作*/
                Task t3 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                Task.WaitAll(t1, t2, t3);
                Console.WriteLine("当前数据量为:" + _Products.Count);
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddProducts()
            {
                Parallel.For(0, 1000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    lock (o)
                    {
                        _Products.Add(product);
                    }
                });
    
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。

    System.Collections.Concurrent

    .NET Framework 4提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。

    需要注意的是:

    线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。

    为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。

    1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。

    2.ConcurrentBag 提供对象的线程安全的无序集合

    3.ConcurrentDictionary  提供可有多个线程同时访问的键值对的线程安全集合

    4.ConcurrentQueue   提供线程安全的先进先出集合

    5.ConcurrentStack   提供线程安全的后进先出集合

    这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。

    ConcurrentQueue  

    ConcurrentQueue 是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:

        class Program
        {
            private static object o = new object();
            /*定义 Queue*/
            private static Queue<Product> _Products { get; set; }
            private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
            /*  coder:释迦苦僧  
             *  代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况
             */
            static void Main(string[] args)
            {
                Thread.Sleep(1000);
                _Products = new Queue<Product>();
                Stopwatch swTask = new Stopwatch();
                swTask.Start();
    
                /*创建任务 t1  t1 执行 数据集合添加操作*/
                Task t1 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t2  t2 执行 数据集合添加操作*/
                Task t2 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t3  t3 执行 数据集合添加操作*/
                Task t3 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
    
                Task.WaitAll(t1, t2, t3);
                swTask.Stop();
                Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
                Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);
    
                Thread.Sleep(1000);
                _ConcurrenProducts = new ConcurrentQueue<Product>();
                Stopwatch swTask1 = new Stopwatch();
                swTask1.Start();
    
                /*创建任务 tk1  tk1 执行 数据集合添加操作*/
                Task tk1 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
                /*创建任务 tk2  tk2 执行 数据集合添加操作*/
                Task tk2 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
                /*创建任务 tk3  tk3 执行 数据集合添加操作*/
                Task tk3 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
    
                Task.WaitAll(tk1, tk2, tk3);
                swTask1.Stop();
                Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
                Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddProducts()
            {
                Parallel.For(0, 30000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    lock (o)
                    {
                        _Products.Enqueue(product);
                    }
                });
    
            }
            /*执行集合数据添加操作*/
            static void AddConcurrenProducts()
            {
                Parallel.For(0, 30000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    _ConcurrenProducts.Enqueue(product);
                });
    
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    需要注意的是,代码中的输出时间并不能够完全正确的展示出并发代码下的ConcurrentQueue性能,采用ConcurrentQueue在一定程度上也带来了损耗,如下图所示:

    ConcurrentQueue 还有另外两种方法:TryDequeue  尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

        class Program
        {
            private static object o = new object();
            private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
            /*  coder:释迦苦僧  
             *  ConcurrentQueue  下的 TryPeek 和 TryDequeue
             */
            static void Main(string[] args)
            {
                _ConcurrenProducts = new ConcurrentQueue<Product>();
                /*执行添加操作*/
                Console.WriteLine("执行添加操作");
                Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);
                Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
                /*执行TryPeek操作   尝试返回不移除*/
                Console.WriteLine("执行TryPeek操作   尝试返回不移除");
                Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);
                Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
    
                /*执行TryDequeue操作  尝试返回并移除*/
                Console.WriteLine("执行TryDequeue操作  尝试返回并移除");
                Parallel.Invoke(DequeueConcurrenProducts, DequeueConcurrenProducts);
                Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
    
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddConcurrenProducts()
            {
                Parallel.For(0, 100, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    _ConcurrenProducts.Enqueue(product);
                });
            }
            /*尝试返回 但不移除*/
            static void PeekConcurrenProducts()
            {
                Parallel.For(0, 2, (i) =>
                {
                    Product product = null;
                    bool excute = _ConcurrenProducts.TryPeek(out product);
                    Console.WriteLine(product.Name);
                });
            }
            /*尝试返回 并 移除*/
            static void DequeueConcurrenProducts()
            {
                Parallel.For(0, 2, (i) =>
                {
                    Product product = null;
                    bool excute = _ConcurrenProducts.TryDequeue(out product);
                    Console.WriteLine(product.Name);
                });
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    需要注意 TryDequeue  和  TryPeek 的无序性,在多线程下

    ConcurrentStack  是完全无锁的,能够支持并发的添加元素,后进先出。下面贴代码,详解见注释:

            private static object o = new object();
            /*定义 Stack*/
            private static Stack<Product> _Products { get; set; }
            private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }
            /*  coder:释迦苦僧  
             *  代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 30000 条数据 查看 一般Stack 和 多线程安全下的 ConcurrentStack 执行情况
             */
            static void Main(string[] args)
            {
                Thread.Sleep(1000);
                _Products = new Stack<Product>();
                Stopwatch swTask = new Stopwatch();
                swTask.Start();
    
                /*创建任务 t1  t1 执行 数据集合添加操作*/
                Task t1 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t2  t2 执行 数据集合添加操作*/
                Task t2 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
                /*创建任务 t3  t3 执行 数据集合添加操作*/
                Task t3 = Task.Factory.StartNew(() =>
                {
                    AddProducts();
                });
    
                Task.WaitAll(t1, t2, t3);
                swTask.Stop();
                Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
                Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);
    
                Thread.Sleep(1000);
                _ConcurrenProducts = new ConcurrentStack<Product>();
                Stopwatch swTask1 = new Stopwatch();
                swTask1.Start();
    
                /*创建任务 tk1  tk1 执行 数据集合添加操作*/
                Task tk1 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
                /*创建任务 tk2  tk2 执行 数据集合添加操作*/
                Task tk2 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
                /*创建任务 tk3  tk3 执行 数据集合添加操作*/
                Task tk3 = Task.Factory.StartNew(() =>
                {
                    AddConcurrenProducts();
                });
    
                Task.WaitAll(tk1, tk2, tk3);
                swTask1.Stop();
                Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
                Console.WriteLine("ConcurrentStack<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddProducts()
            {
                Parallel.For(0, 30000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    lock (o)
                    {
                        _Products.Push(product);
                    }
                });
    
            }
            /*执行集合数据添加操作*/
            static void AddConcurrenProducts()
            {
                Parallel.For(0, 30000, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    _ConcurrenProducts.Push(product);
                });
    
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    ConcurrentStack 还有另外两种方法:TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

        class Program
        {
            private static object o = new object();
            private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }
            /*  coder:释迦苦僧  
             *  ConcurrentQueue  下的 TryPeek 和 TryPop
             */
            static void Main(string[] args)
            {
                _ConcurrenProducts = new ConcurrentStack<Product>();
                /*执行添加操作*/
                Console.WriteLine("执行添加操作");
                Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);
                Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
                /*执行TryPeek操作   尝试返回不移除*/
                Console.WriteLine("执行TryPeek操作   尝试返回不移除");
                Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);
                Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
    
                /*执行TryDequeue操作  尝试返回并移除*/
                Console.WriteLine("执行TryPop操作  尝试返回并移除");
                Parallel.Invoke(PopConcurrenProducts, PopConcurrenProducts);
                Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
    
                Console.ReadLine();
            }
    
            /*执行集合数据添加操作*/
            static void AddConcurrenProducts()
            {
                Parallel.For(0, 100, (i) =>
                {
                    Product product = new Product();
                    product.Name = "name" + i;
                    product.Category = "Category" + i;
                    product.SellPrice = i;
                    _ConcurrenProducts.Push(product);
                });
            }
            /*尝试返回 但不移除*/
            static void PeekConcurrenProducts()
            {
                Parallel.For(0, 2, (i) =>
                {
                    Product product = null;
                    bool excute = _ConcurrenProducts.TryPeek(out product);
                    Console.WriteLine(product.Name);
                });
            }
            /*尝试返回 并 移除*/
            static void PopConcurrenProducts()
            {
                Parallel.For(0, 2, (i) =>
                {
                    Product product = null;
                    bool excute = _ConcurrenProducts.TryPop(out product);
                    Console.WriteLine(product.Name);
                });
            }
        }
    
        class Product
        {
            public string Name { get; set; }
            public string Category { get; set; }
            public int SellPrice { get; set; }
        }
    View Code

    对于并发下的其他集合,我这边就不做代码案列了,大家可以通过下面的链接查看,如有问题,欢迎指正

    http://msdn.microsoft.com/zh-cn/library/system.collections.concurrent(v=vs.110).aspx

    作者:释迦苦僧 出处:http://www.cnblogs.com/woxpp/p/3935557.html
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。

    作者:释迦苦僧
    出处:http://www.cnblogs.com/woxpp
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
    生活不易,五行缺金,求打点
  • 相关阅读:
    Python 类中方法的内部变量,命名加'self.'变成 self.xxx 和不加直接 xxx 的区别
    用foreach遍历 datagridView 指定列所有的内容
    treeView1.SelectedNode.Level
    YES NO 上一个 下一个
    正则 单词全字匹配查找 reg 边界查找 精确匹配 只匹配字符 不含连续的字符
    抓取2个字符串中间的字符串
    sqlite 60000行 插入到数据库只用不到2秒
    将多行文本以单行的格式保存起来 读和写 ini
    将秒转换成时间格式
    richtextbox Ctrl+V只粘贴纯文本格式
  • 原文地址:https://www.cnblogs.com/woxpp/p/3935557.html
Copyright © 2011-2022 走看看