zoukankan      html  css  js  c++  java
  • .NET(C#):线程安全集合的阻塞BlockingCollection的使用

    返回目录

    1. 限制最大容量:BoundedCapacity

    BoundedCapacity属性和CompleteAdding方法,它们都可以从某种方式上限制元素被加入到集合中。但BoundedCapacity是用来限制集合的最大容量,当容量已满后,后续的添加操作会被阻塞,一旦有元素被移除,那么阻塞的添加操作会成功执行。

    比如下面代码,试图将1-50加入到BlockingCollection,此时默认内部是ConcurrentBag,当然你可以指定任意IProducerConsumerCollection。我们把BoundedCapacity设置成2。

    var bcollec = new BlockingCollection<int>(2);

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                bcollec.Add(i);

                Console.WriteLine("加入:" + i);

            });

        });

    Thread.Sleep(1000);

    Console.WriteLine("调用一次Take");

    bcollec.Take();

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    加入:37

    加入:13

    调用一次Take

    加入:25

    只有最多两个可以加入,然后调用Take后,下一个元素才会被加入。(注意此时Parallel.For中会有多个线程处于阻塞状态,因为无法加入数据)。

    返回目录

    2. 禁止加入:CompleteAdding和IsCompleted

    CompleteAdding方法则是直接不允许任何元素被加入集合,即使是当前元素的数量小于BoundedCapacity属性。

    代码:

    var bcollec = new BlockingCollection<int>(5);

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                Console.WriteLine("准备加入:" + i);

                bcollec.Add(i);

                Console.WriteLine("== 成功加入:" + i);

                Thread.Sleep(1000);

            });

        });

    //等待一小段时间后马上调用CompleteAdding

    Thread.Sleep(500);

    Console.WriteLine("调用CompleteAdding");

    bcollec.CompleteAdding();

    Thread.Sleep(Timeout.Infinite);

    上述代码可能的输出:

    准备加入:1

    准备加入:13

    准备加入:25

    准备加入:37

    == 成功加入:13

    == 成功加入:1

    == 成功加入:37

    == 成功加入:25

    调用CompleteAdding

    准备加入:2

    准备加入:38

    准备加入:26

    准备加入:14

    可以看到,虽然BlockingCollection的BoundedCapacity为5,但是由于提前调用了CompleteAdding,即使当前集合只有4个元素,也不会再同意新的加入操作了。

    那么CompleteAdding有什么用?当使用了CompleteAdding方法后且集合内没有元素的时候,另一个属性IsCompleted此时会为True,这个属性可以用来判断是否当前集合内的所有元素都被处理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用来处理此类生产者-消费者问题的。

    下面我们首先在多个线程中试图往BlockingCollection中加入元素,然后中途调用CompleteAdding,接着通过IsCompleted属性逐个处理被成功加入的元素。

    如下代码:

    var bcollec = new BlockingCollection<int>();

    //试图添加1-50

    Task.Run(() =>

        {

            Parallel.For(151, i =>

            {

                bcollec.Add(i);

                Console.WriteLine("成功加入:" + i);

                Thread.Sleep(1000);

            });

        });

    //等待一小段时间后马上调用CompleteAdding

    Thread.Sleep(700);

    Console.WriteLine("调用CompleteAdding");

    bcollec.CompleteAdding();

    Console.WriteLine("容器元素数量:" + bcollec.Count);

    while (!bcollec.IsCompleted)

    {

        var res = bcollec.Take();

        Console.WriteLine("取出:" + res);

    }

    Console.WriteLine("操作完成");

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    成功加入:37

    成功加入:25

    成功加入:13

    成功加入:1

    调用CompleteAdding

    容器元素数量:4

    取出:1

    取出:37

    取出:25

    取出:13

    操作完成

    返回目录

    3. 枚举:GetConsumingEnumerable和BlockingCollection本身

    BlockingCollection有两种枚举方法,首先BlockingCollection本身继承自IEnumerable<T>,所以它自己就可以被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它自己也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection自己作为IEnumerable会返回一个一定时间内的集合片段,也就是只会枚举在那个时间点上内部集合的元素。

    看下面代码:

    var bcollec = new BlockingCollection<int>();

    //试图添加1-10

    Task.Run(() =>

    {

        var forOpt = new ParallelOptions()

        {

            //防止在某些硬件上并发数太多

            MaxDegreeOfParallelism = 2

        };

        Parallel.For(111, forOpt, i =>

        {

            bcollec.Add(i);

            Console.WriteLine("成功加入:" + i);

            Thread.Sleep(500);

        });

    });

    Thread.Sleep(700);

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec)

            Console.WriteLine("输出:" + i);

    });

    Thread.Sleep(Timeout.Infinite);

    我们边加入元素边进行枚举(直接在BlockingCollection上foreach),可能的输出:

    成功加入:1

    成功加入:6

    成功加入:2

    成功加入:7

    输出:1

    输出:6

    输出:2

    输出:7

    成功加入:8

    成功加入:3

    成功加入:4

    成功加入:9

    成功加入:5

    成功加入:10

    可以看到,BlockingCollection本身的迭代器只能反映出一时的容器内容。

    而BlockingCollection还有一个GetConsumingEnumerable方法,同样返回一个IEnumerable<T>,这个可枚举的集合背后的迭代器不同于BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果当前时间段没有元素被加入,它会阻塞然后等新加进来的元素。

    我们把上面的使用BlockingCollection本身枚举代码中的枚举Task改成这样:

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec.GetConsumingEnumerable())

            Console.WriteLine("输出:" + i);

        Console.WriteLine("完成枚举");

    });

    可能的输出:

    成功加入:6

    成功加入:1

    成功加入:2

    成功加入:7

    输出:6

    输出:1

    输出:2

    输出:7

    成功加入:3

    成功加入:8

    输出:3

    输出:8

    成功加入:4

    成功加入:9

    输出:4

    输出:9

    成功加入:10

    成功加入:5

    输出:10

    输出:5

    这个迭代器很给力,一直处于等待和执行的状态,只要有新的元素被加入,它会找机会去执行foreach的内容,然后再阻塞去等新的元素。

    而且在输出中,代码里的“完成枚举”字符串一直没有被输出。此时它还在卖力地等……因为它不确定什么时候才不会有新元素被加入。

    返回目录

    4. GetConsumingEnumerable和CompleteAdding

    好,此时你应该想到了上面学的CompleteAdding方法,它可以禁止新的元素被加入到BlockingCollection的内部线程安全集合中,所以使用这个方法可以通知GetConsumingEnumerable的迭代器您老不用再等了,后面不会有元素被加进来了。

    如下代码:

    抱歉,这几段代码都不短,而且都类似。但我仍然把完整代码贴出来,虽然这使文章比较冗长,但是我觉得这样读者浏览或者复制时从上到下一目了然,总比看到诸如“请把前面xxx个代码做如下修改:把xxx行改成xxx,在xxx行加入这段代码……”好吧。

    var bcollec = new BlockingCollection<int>();

    //试图添加1-10

    Task.Run(() =>

    {

        var forOpt = new ParallelOptions()

        {

            //防止在某些硬件上并发数太多

            MaxDegreeOfParallelism = 2

        };

        Parallel.For(111, forOpt, i =>

        {

            Console.WriteLine("等待加入:" + i);

            bcollec.Add(i);

            Console.WriteLine("成功加入:" + i);

            Thread.Sleep(500);

        });

    });

    Thread.Sleep(600);

    //开始枚举

    Task.Run(() =>

    {

        foreach (var i in bcollec.GetConsumingEnumerable())

            Console.WriteLine("输出:" + i);

        Console.WriteLine("完成枚举");

    });

    Thread.Sleep(300);

    bcollec.CompleteAdding();

    Console.WriteLine("=== 调用CompleteAdding");

    Thread.Sleep(Timeout.Infinite);

    可能的输出:

    等待加入:1

    等待加入:6

    成功加入:1

    成功加入:6

    等待加入:2

    成功加入:2

    等待加入:7

    成功加入:7

    输出:1

    输出:6

    输出:2

    输出:7

    === 调用CompleteAdding

    完成枚举

    等待加入:3

    等待加入:8

    可以看到,等CompleteAdding,“枚举完成”马上被输出!

    :D

    作者:Mgen

    本文版权归作者所有,欢迎以网址(链接)的方式转载,不欢迎复制文章内容的方式转载,其一是为了在搜索引擎中去掉重复文章内容,其二复制后的文章往往没有提供本博客的页面格式和链接,造成文章可读性很差。望有素质人自觉遵守上述建议。

    如果一定要以复制文章内容的方式转载,必须在文章开头标明作者信息和原文章链接地址。否则保留追究法律责任的权利。

    当前标签: BCL

    共9页: 1 2 3 4 5 6 7 8 9 下一页 
    .NET(C#):线程安全集合的阻塞BlockingCollection的使用 _Mgen 2012-03-15 00:50 阅读:1091 评论:3  
     
    .NET(C#):多个await和和异常处理 _Mgen 2012-03-13 22:22 阅读:24 评论:0  
     
    .NET 4.5 beta中关于Task的未觉察异常的更新 _Mgen 2012-03-13 20:12 阅读:26 评论:0  
     
    .NET(C#):await返回Task的async方法 _Mgen 2012-03-12 23:56 阅读:132 评论:0  
     
    WPF:谈谈各种多线程去修改或访问UI线程数据的方法 _Mgen 2012-03-10 21:15 阅读:28 评论:0  
     
    .NET(C#) TPL:TaskFactory.FromAsync与委托的异步调用 _Mgen 2012-03-02 14:09 阅读:1259 评论:3  
     
    .NET(C#):设置文件系统对象的访问控制 _Mgen 2012-02-28 14:26 阅读:146 评论:0  
     
    .NET(C#):ConcurrentBag<T>同线程元素的添加和删除 _Mgen 2012-02-21 13:45 阅读:42 评论:0  
     
    .NET(C#) TPL:Parallel循环和多个Task的异常处理 _Mgen 2012-02-20 13:40 阅读:38 评论:0  
     
    .NET(C#):DebuggerDisplay特性碉堡了! _Mgen 2012-02-19 11:33 阅读:42 评论:0  
     
     
    .NET(C#):警惕PLINQ结果的无序性 _Mgen 2012-02-17 23:23 阅读:26 评论:0  
     
    .NET(C#):DLR有趣的调用自己的动态对象 _Mgen 2012-02-17 20:28 阅读:28 评论:0  
     
    .NET(C#):计算HttpWebResponse的下载速度 _Mgen 2012-01-30 16:34 阅读:109 评论:0  
     
    .NET(C#):使用HttpWebRequest头中的Range下载文件片段 _Mgen 2012-01-30 10:38 阅读:103 评论:0  
     
    .NET(C#):将数据字节大小转换成易读的单位字符串 _Mgen 2012-01-28 18:29 阅读:59 评论:0  
     
    共9页: 1 2 3 4 5 6 7 8 9 下一页 
  • 相关阅读:
    数学之道-微积分
    mysql join实现方式
    python pip源配置
    python使用tesseract-ocr完成验证码识别
    Linux和Windows下查看环境变量方法对比
    把大象装进冰箱的N种方法
    mysql 取当前日期对应的周一或周日
    window 安装 Twisted 遇到的问题
    Java泛型
    Android之Adapter用法总结
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2399743.html
Copyright © 2011-2022 走看看