zoukankan      html  css  js  c++  java
  • .NET(C#):线程安全集合的阻塞BlockingCollection的使用

    返回目录

    1. 限制最大容量:BoundedCapacity

    BoundedCapacity属性和CompleteAdding方法,它们都可以从某种方式上限制元素被加入到集合中。但BoundedCapacity是用来限制集合的最大容量,当容量已满后,后续的添加操作会被阻塞,一旦有元素被移除,那么阻塞的添加操作会成功执行。

    比如下面代码,试图将1-50加入到BlockingCollection,此时默认内部是ConcurrentBag,当然你可以指定任意IProducerConsumerCollection。我们把BoundedCapacity设置成2。

    var bcollec = new BlockingCollection<int>(2);

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                bcollec.Add(i);

                Console.WriteLine("加入:" + i);

            });

        });

    Thread.Sleep(1000);

    Console.WriteLine("调用一次Take");

    bcollec.Take();

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    加入:37

    加入:13

    调用一次Take

    加入:25

    只有最多两个可以加入,然后调用Take后,下一个元素才会被加入。(注意此时Parallel.For中会有多个线程处于阻塞状态,因为无法加入数据)。

    返回目录

    2. 禁止加入:CompleteAdding和IsCompleted

    CompleteAdding方法则是直接不允许任何元素被加入集合,即使是当前元素的数量小于BoundedCapacity属性。

    代码:

    var bcollec = new BlockingCollection<int>(5);

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                Console.WriteLine("准备加入:" + i);

                bcollec.Add(i);

                Console.WriteLine("== 成功加入:" + i);

                Thread.Sleep(1000);

            });

        });

    //等待一小段时间后马上调用CompleteAdding

    Thread.Sleep(500);

    Console.WriteLine("调用CompleteAdding");

    bcollec.CompleteAdding();

    Thread.Sleep(Timeout.Infinite);

    上述代码可能的输出:

    准备加入:1

    准备加入:13

    准备加入:25

    准备加入:37

    == 成功加入:13

    == 成功加入:1

    == 成功加入:37

    == 成功加入:25

    调用CompleteAdding

    准备加入:2

    准备加入:38

    准备加入:26

    准备加入:14

    可以看到,虽然BlockingCollection的BoundedCapacity为5,但是由于提前调用了CompleteAdding,即使当前集合只有4个元素,也不会再同意新的加入操作了。

    那么CompleteAdding有什么用?当使用了CompleteAdding方法后且集合内没有元素的时候,另一个属性IsCompleted此时会为True,这个属性可以用来判断是否当前集合内的所有元素都被处理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用来处理此类生产者-消费者问题的。

    下面我们首先在多个线程中试图往BlockingCollection中加入元素,然后中途调用CompleteAdding,接着通过IsCompleted属性逐个处理被成功加入的元素。

    如下代码:

    var bcollec = new BlockingCollection<int>();

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                bcollec.Add(i);

                Console.WriteLine("成功加入:" + i);

                Thread.Sleep(1000);

            });

        });

    //等待一小段时间后马上调用CompleteAdding

    Thread.Sleep(700);

    Console.WriteLine("调用CompleteAdding");

    bcollec.CompleteAdding();

    Console.WriteLine("容器元素数量:" + bcollec.Count);

    while (!bcollec.IsCompleted)

    {

        var res = bcollec.Take();

        Console.WriteLine("取出:" + res);

    }

    Console.WriteLine("操作完成");

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    成功加入:37

    成功加入:25

    成功加入:13

    成功加入:1

    调用CompleteAdding

    容器元素数量:4

    取出:1

    取出:37

    取出:25

    取出:13

    操作完成

    返回目录

    3. 枚举:GetConsumingEnumerable和BlockingCollection本身

    BlockingCollection有两种枚举方法,首先BlockingCollection本身继承自IEnumerable<T>,所以它自己就可以被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它自己也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection自己作为IEnumerable会返回一个一定时间内的集合片段,也就是只会枚举在那个时间点上内部集合的元素。

    看下面代码:

    var bcollec = new BlockingCollection<int>();

    //试图添加1-10

    Task.Run(() =>

    {

        var forOpt = new ParallelOptions()

        {

            //防止在某些硬件上并发数太多

            MaxDegreeOfParallelism = 2

        };

        Parallel.For(111, forOpt, i =>

        {

            bcollec.Add(i);

            Console.WriteLine("成功加入:" + i);

            Thread.Sleep(500);

        });

    });

    Thread.Sleep(700);

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec)

            Console.WriteLine("输出:" + i);

    });

    Thread.Sleep(Timeout.Infinite);

    我们边加入元素边进行枚举(直接在BlockingCollection上foreach),可能的输出:

    成功加入:1

    成功加入:6

    成功加入:2

    成功加入:7

    输出:1

    输出:6

    输出:2

    输出:7

    成功加入:8

    成功加入:3

    成功加入:4

    成功加入:9

    成功加入:5

    成功加入:10

    可以看到,BlockingCollection本身的迭代器只能反映出一时的容器内容。

    而BlockingCollection还有一个GetConsumingEnumerable方法,同样返回一个IEnumerable<T>,这个可枚举的集合背后的迭代器不同于BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果当前时间段没有元素被加入,它会阻塞然后等新加进来的元素。

    我们把上面的使用BlockingCollection本身枚举代码中的枚举Task改成这样:

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec.GetConsumingEnumerable())

            Console.WriteLine("输出:" + i);

        Console.WriteLine("完成枚举");

    });

    可能的输出:

    成功加入:6

    成功加入:1

    成功加入:2

    成功加入:7

    输出:6

    输出:1

    输出:2

    输出:7

    成功加入:3

    成功加入:8

    输出:3

    输出:8

    成功加入:4

    成功加入:9

    输出:4

    输出:9

    成功加入:10

    成功加入:5

    输出:10

    输出:5

    这个迭代器很给力,一直处于等待和执行的状态,只要有新的元素被加入,它会找机会去执行foreach的内容,然后再阻塞去等新的元素。

    而且在输出中,代码里的“完成枚举”字符串一直没有被输出。此时它还在卖力地等……因为它不确定什么时候才不会有新元素被加入。

    返回目录

    4. GetConsumingEnumerable和CompleteAdding

    好,此时你应该想到了上面学的CompleteAdding方法,它可以禁止新的元素被加入到BlockingCollection的内部线程安全集合中,所以使用这个方法可以通知GetConsumingEnumerable的迭代器您老不用再等了,后面不会有元素被加进来了。

    如下代码:

    抱歉,这几段代码都不短,而且都类似。但我仍然把完整代码贴出来,虽然这使文章比较冗长,但是我觉得这样读者浏览或者复制时从上到下一目了然,总比看到诸如“请把前面xxx个代码做如下修改:把xxx行改成xxx,在xxx行加入这段代码……”好吧。

    var bcollec = new BlockingCollection<int>();

    //试图添加1-10

    Task.Run(() =>

    {

        var forOpt = new ParallelOptions()

        {

            //防止在某些硬件上并发数太多

            MaxDegreeOfParallelism = 2

        };

        Parallel.For(111, forOpt, i =>

        {

            Console.WriteLine("等待加入:" + i);

            bcollec.Add(i);

            Console.WriteLine("成功加入:" + i);

            Thread.Sleep(500);

        });

    });

    Thread.Sleep(600);

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec.GetConsumingEnumerable())

            Console.WriteLine("输出:" + i);

        Console.WriteLine("完成枚举");

    });

    Thread.Sleep(300);

    bcollec.CompleteAdding();

    Console.WriteLine("=== 调用CompleteAdding");

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    等待加入:1

    等待加入:6

    成功加入:1

    成功加入:6

    等待加入:2

    成功加入:2

    等待加入:7

    成功加入:7

    输出:1

    输出:6

    输出:2

    输出:7

    === 调用CompleteAdding

    完成枚举

    等待加入:3

    等待加入:8

    可以看到,等CompleteAdding,“枚举完成”马上被输出!

    :D

    作者:Mgen

    本文版权归作者所有,欢迎以网址(链接)的方式转载,不欢迎复制文章内容的方式转载,其一是为了在搜索引擎中去掉重复文章内容,其二复制后的文章往往没有提供本博客的页面格式和链接,造成文章可读性很差。望有素质人自觉遵守上述建议。

    如果一定要以复制文章内容的方式转载,必须在文章开头标明作者信息和原文章链接地址。否则保留追究法律责任的权利。

    当前标签: BCL

    共9页: 1 2 3 4 5 6 7 8 9 下一页 
    .NET(C#):线程安全集合的阻塞BlockingCollection的使用 _Mgen 2012-03-15 00:50 阅读:1091 评论:3  
     
    .NET(C#):多个await和和异常处理 _Mgen 2012-03-13 22:22 阅读:24 评论:0  
     
    .NET 4.5 beta中关于Task的未觉察异常的更新 _Mgen 2012-03-13 20:12 阅读:26 评论:0  
     
    .NET(C#):await返回Task的async方法 _Mgen 2012-03-12 23:56 阅读:132 评论:0  
     
    WPF:谈谈各种多线程去修改或访问UI线程数据的方法 _Mgen 2012-03-10 21:15 阅读:28 评论:0  
     
    .NET(C#) TPL:TaskFactory.FromAsync与委托的异步调用 _Mgen 2012-03-02 14:09 阅读:1259 评论:3  
     
    .NET(C#):设置文件系统对象的访问控制 _Mgen 2012-02-28 14:26 阅读:146 评论:0  
     
    .NET(C#):ConcurrentBag<T>同线程元素的添加和删除 _Mgen 2012-02-21 13:45 阅读:42 评论:0  
     
    .NET(C#) TPL:Parallel循环和多个Task的异常处理 _Mgen 2012-02-20 13:40 阅读:38 评论:0  
     
    .NET(C#):DebuggerDisplay特性碉堡了! _Mgen 2012-02-19 11:33 阅读:42 评论:0  
     
     
    .NET(C#):警惕PLINQ结果的无序性 _Mgen 2012-02-17 23:23 阅读:26 评论:0  
     
    .NET(C#):DLR有趣的调用自己的动态对象 _Mgen 2012-02-17 20:28 阅读:28 评论:0  
     
    .NET(C#):计算HttpWebResponse的下载速度 _Mgen 2012-01-30 16:34 阅读:109 评论:0  
     
    .NET(C#):使用HttpWebRequest头中的Range下载文件片段 _Mgen 2012-01-30 10:38 阅读:103 评论:0  
     
    .NET(C#):将数据字节大小转换成易读的单位字符串 _Mgen 2012-01-28 18:29 阅读:59 评论:0  
     
    共9页: 1 2 3 4 5 6 7 8 9 下一页 
  • 相关阅读:
    10. Regular Expression Matching
    9. Palindrome Number
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    4. Median of Two Sorted Arrays
    3. Longest Substring Without Repeating Characters
    2. Add Two Numbers
    链式表的按序号查找
    可持久化线段树——区间更新hdu4348
    主席树——树链上第k大spoj COT
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2399743.html
Copyright © 2011-2022 走看看