zoukankan      html  css  js  c++  java
  • 线程安全之ConcurrentQueue<T>队列

      最近在弄一个小项目,大概600w行的数据,要进行数据清洗,因数据量偏大,如果单线程去执行,会造成效率偏低,只能用多线程了,但采用多线程存在线程安全问题,于是查了下资料,发现有ConcurrentQueue<T>该数据结构,完美的解决了我目前问题。

         自msdn上面解释:表示线程安全的先进先出 (FIFO) 集合。

           先说说简单的用法吧:(来自msdn)

      1.Enqueue(T) 将对象添加到 ConcurrentQueue<T> 的结尾处。

      2.TryDequeue(T) 尝试移除并返回位于并发队列开头处的对象。

      3.Count 获取 ConcurrentQueue<T> 中包含的元素数

      4.IsEmpty 获取一个值,该值指示 ConcurrentQueue<T> 是否为空。

         下面是小项目的实现方案,采用最简单的方式(生产者/消费者模式),先将数据写入到队列中,再由消费者进行消费,以下是我写的一个小Demo,用于学习,不对的地方请各位多多指教!

        

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ThreadCQueue
    {
        class Program
        {
            static void Main(string[] args)
            {
                Task t = RunProgram();
                t.Wait();
                Console.WriteLine("ok");
                Console.ReadKey();
            }
    
            static async Task RunProgram()
            {
                var taskQueue = new ConcurrentQueue<CustomTask>();
                //生产
                var taskSource = Task.Run(() => TaskProducer(taskQueue));
                await taskSource;
                //消费者
                var processors = new Task[4];
                for (var i = 1; i <= 4; i++)
                {
                    string processordId = i.ToString();
                    processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processordId}"));
                }
                await Task.WhenAll(processors);
            }
            static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
            {
                for (var i = 1; i <= 20; i++)
                {
                    await Task.Delay(50);
                    var workItem = new CustomTask { Id = i };
                    queue.Enqueue(workItem);
                }
            }
            static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name)
            {
                CustomTask workItem;
                await GetRandomDelay();
                while (queue.TryDequeue(out workItem))
                {
                    Console.WriteLine($"消费 {workItem.Id}===>{name}");
                    await GetRandomDelay();
                }
            }
    
            static Task GetRandomDelay()
            {
                int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
                return Task.Delay(delay);
            }
        }
        class CustomTask
        {
            public int Id { get; set; }
        }
    }
    

      

  • 相关阅读:
    LeetCode Single Number
    Leetcode Populating Next Right Pointers in Each Node
    LeetCode Permutations
    Leetcode Sum Root to Leaf Numbers
    LeetCode Candy
    LeetCode Sort List
    LeetCode Remove Duplicates from Sorted List II
    LeetCode Remove Duplicates from Sorted List
    spring MVC HandlerInterceptorAdapter
    yum
  • 原文地址:https://www.cnblogs.com/SmallHan/p/11874867.html
Copyright © 2011-2022 走看看