zoukankan      html  css  js  c++  java
  • 你真的知道.NET Framework中的阻塞队列BlockingCollection的妙用吗?

    BlockingCollection集合是一个拥有阻塞功能的集合,它就是完成了经典生产者消费者的算法功能。一般情况下,我们可以基于 生产者 - 消费者模式来实现并发。BlockingCollection<T> 类是最好的解决方案

    刚结束的物联网卡项目,我需要调用移动的某个具有批量获取物联网卡数据的接口,其实最主要的数据就是物联网卡卡号,然后通过这两个卡号去调用其余的两个接口,最后拼接起来,就有了物联网卡的完整信息。但是问题来了,物联网卡数量多,而且每次调用接口还需要费上一到两秒,如果正常的读取,那不得慢死,所以就用并发来做。我想到的是阻塞队列+生产者消费者模型,使用的阻塞队列是.net线程安全集合的BlockingCollection, 具体的可以看《你不能错过.net 并发解决方案》《深入理解阻塞队列》《.net framework 4 线程安全概述》。
    但是问题来了,MSDN上的例子以及《C# 高级编程第九版》中的管道模型代码都是基于单个的Task, 在这里我肯定是用了多个Task去读取接口,为什么我要说这点,多线程是不可测得,我如何识别阻塞队列已满,如何及时获取阻塞队列中的数据,并不重复的获取呢?具体的简单demo,请看《你不能错过.net 并发解决方案》。我一开始是这么写的:

    BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
                ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();
    
                var t = new Task[50];
                for (int i = 0; i <= 49; i++)
                {
                    t[i] = Task.Factory.StartNew((obj) =>
                    {
                        Thread.Sleep(2500);
                        blockingCollection.Add(obj.ToString());
                        concurrentQueue.Enqueue(obj.ToString());
                        Console.WriteLine("Task中的数据: {0}", obj.ToString());
                    }, i + 1);
                }
    
                Thread.Sleep(5000);
                while (!blockingCollection.IsCompleted)
                {
                    foreach (var b in blockingCollection.GetConsumingEnumerable())
                    {
                        Console.WriteLine("开始输出阻塞队列: {0}", b);
                        Console.WriteLine(blockingCollection.IsCompleted);
                        Console.WriteLine("并发队列的数量: {0}", concurrentQueue.Count);
                    }
    
                    Console.WriteLine("调用GetConsumingEnumerable方法遍历完之后阻塞队列的数量: {0}", blockingCollection.Count);
    
                    if (concurrentQueue.Count == 50)
                    {
                        blockingCollection.CompleteAdding();
                    }
                }
    
                Console.WriteLine("********");
    
                Console.WriteLine("是否完成添加: {0}", blockingCollection.IsAddingCompleted);
    
                Console.Read();

    但是结果:

    可以看到,这结果有问题啊,按道理来讲foreach遍历完了就会出来啊,但是这是阻塞队列,肯定不是这样的,那么什么时候能挑出foreach循环?这就和BlockingCollection的设计有关了,我查看了下它的源码,原谅我没有看懂,也就不贴了。后来,我改了下代码,就解决问题了。

    ///// 后续补充

    BlockingCollection的GetComsumingEnumerate方法跳出循环的标志是BlockingCollection的IsCompleteAdding为true并且BlockingCollection集合的数据为空

    /////

    BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
                ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();
    
                var t = new Task[50];
                for (int i = 0; i <= 49; i++)
                {
                    t[i] = Task.Factory.StartNew((obj) =>
                    {
                        Thread.Sleep(2500);
                        blockingCollection.Add(obj.ToString());
                        concurrentQueue.Enqueue(obj.ToString());
                        Console.WriteLine("Task中的数据: {0}", obj.ToString());
                    }, i+1);
                }
    
                Thread.Sleep(5000);
                while (!blockingCollection.IsCompleted)
                {
                    foreach (var b in blockingCollection.GetConsumingEnumerable())
                    {
                        Console.WriteLine("开始输出阻塞队列: {0}", b);
                        Console.WriteLine(blockingCollection.IsCompleted);
                        Console.WriteLine("并发队列的数量: {0}", concurrentQueue.Count);
                        if (concurrentQueue.Count == 50)
                        {
                            blockingCollection.CompleteAdding();
                        }
                    }
                    Console.WriteLine("调用GetConsumingEnumerable方法遍历完之后阻塞队列的数量: {0}", blockingCollection.Count);
                    
                }
    
                Console.WriteLine("********");
                
                Console.WriteLine("是否完成添加: {0}", blockingCollection.IsAddingCompleted);

    结果:

    我没有写的很详细,因为,只是做个笔记,平时学习的时候没有注意到这些问题,没有遇到特定情况下的问题,项目开发中遇到了,就记录下。

  • 相关阅读:
    PIP 全局换源
    专利检索常用的十八个网站
    【PHP】PHPMailer 中文使用说明小结
    去你的,奋斗逼,别把加班文化带到微软来
    Maven、spring Boot 与 Spring Cloud
    vue指令总结
    c++多个源文件共用一个new动态分配类对象(extern 及new的用法)
    git版本控制软件
    Axios 禁用缓存
    怎么写,简历才不会被丢到非洲
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8275688.html
Copyright © 2011-2022 走看看