zoukankan      html  css  js  c++  java
  • 线程安全之ConcurrentQueue<T>队列

      最近在弄一个小项目,大概600w行的数据,要进行数据清洗,因数据量偏大,如果单线程去执行,会造成效率偏低,只能用多线程了,但采用多线程存在线程安全问题,于是查了下资料,发现有ConcurrentQueue<T>该数据结构,完美的解决了我目前问题。

         自msdn上面解释:表示线程安全的先进先出 (FIFO) 集合。

           先说说简单的用法吧:(来自msdn)

      1.Enqueue(T) 将对象添加到 ConcurrentQueue<T> 的结尾处。

      2.TryDequeue(T) 尝试移除并返回位于并发队列开头处的对象。

      3.Count 获取 ConcurrentQueue<T> 中包含的元素数

      4.IsEmpty 获取一个值,该值指示 ConcurrentQueue<T> 是否为空。

         下面是小项目的实现方案,采用最简单的方式(生产者/消费者模式),先将数据写入到队列中,再由消费者进行消费,以下是我写的一个小Demo,用于学习,不对的地方请各位多多指教!

        

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ThreadCQueue
    {
        class Program
        {
            static void Main(string[] args)
            {
                Task t = RunProgram();
                t.Wait();
                Console.WriteLine("ok");
                Console.ReadKey();
            }
    
            static async Task RunProgram()
            {
                var taskQueue = new ConcurrentQueue<CustomTask>();
                //生产
                var taskSource = Task.Run(() => TaskProducer(taskQueue));
                await taskSource;
                //消费者
                var processors = new Task[4];
                for (var i = 1; i <= 4; i++)
                {
                    string processordId = i.ToString();
                    processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processordId}"));
                }
                await Task.WhenAll(processors);
            }
            static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
            {
                for (var i = 1; i <= 20; i++)
                {
                    await Task.Delay(50);
                    var workItem = new CustomTask { Id = i };
                    queue.Enqueue(workItem);
                }
            }
            static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name)
            {
                CustomTask workItem;
                await GetRandomDelay();
                while (queue.TryDequeue(out workItem))
                {
                    Console.WriteLine($"消费 {workItem.Id}===>{name}");
                    await GetRandomDelay();
                }
            }
    
            static Task GetRandomDelay()
            {
                int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
                return Task.Delay(delay);
            }
        }
        class CustomTask
        {
            public int Id { get; set; }
        }
    }
    

      

  • 相关阅读:
    关于选择器
    关于定位
    jq第一讲
    js第三讲
    js第2讲
    js第一讲
    HTML第三讲的补充及HTML5新增标签和属性
    CSS第 三讲概要
    CSS第二讲概要
    CSS第一讲概要
  • 原文地址:https://www.cnblogs.com/SmallHan/p/11874867.html
Copyright © 2011-2022 走看看