zoukankan      html  css  js  c++  java
  • C#集合之并发集合

      .NET 4 开始,在System.Collection.Concurrent中提供了几个线程安全的集合类。线程安全的集合可防止多个线程以相互冲突的方式访问集合。
      为了对集合进行线程安全的访问,定义了IProducerConsumerCollection<T>接口。这个接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作就可能失败。TryAdd()方法返回一个布尔值,以说明操作成功还是失败。TryTake()同样,在成功时返回集合中的项。
        *ConcurrentQueue<T>————这个集合类用一种免锁定的算法实现,使用在内部合并到一个链表中的32项数组。该类有Enqueue(),TryDequeue()和TryPeek()方法。因为这个类实现了              IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法仅调用Enqueue和TryDequeue方法。
        *ConcurrentStack<T>————类似于ConcurrentQueue<T>。该类定义了Push(),PushRange(),TryPeek(),TryPop()和TryPopRange()方法。在内部这个类使用其元素的链表。
        *ConcurrentBag<T>————该类没有定义添加或提取项的任何顺序。这个类使用一个线程映射到内部使用的数组上的概念,因此尝试减少锁定。方法:Add(),TryPeek(),TryTake()。
        *ConcurrentDictionary<TKey,TValue>————这是一个线程安全的键值集合。TryAdd(),TryGetValue(),TryRemove()和TryUpdate()方法以非阻塞的方式访问成员。因为元素基于键和值,所以  ConcurrentDictionary<TKey,TValue>没有实现IProducerConsumerCollection<T>接口。
        *BlockingCollection<T>————这个集合在可以添加或提取元素之前,会阻塞线程并一直等待。BlockingCollection<T>集合提供了一个接口,以使用Add()和Take()方法来删除和添加元素。这些方法会阻塞线程。Add()方法有一个重载版本,其中可以给该重载版本传递一个CancellationToken令牌。这个令牌允许取消被阻塞的调用。如果不希望无限的等待下去,且不希望从外部取消调用,就可以使用TryAdd()和TryTake()方法,在这些方法中,也可以指定一个超时值。
      BlockingCollection<T>是对实现了 IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还可以给构造函数传递任何实现了 IProducerConsumerCollection<T>接口的类。

      下面使用一个例子演示BlockingCollection<T>的使用,一个任务向一个集合写入,同时另一个任务从这个集合读取。

        static void Main(string[] args)
            {
              StartPipeline();
              Console.ReadLine();
            }
    
            private static async void StartPipeline()
            {
                //存储文件名
              var fileNames = new BlockingCollection<string>();
              //存储文件的每一行内容
              var lines = new BlockingCollection<string>();
              //存储每一行的每个单词,单词为键,单词个数为值
              var words = new ConcurrentDictionary<string, int>();
              //存储words信息
              var items = new BlockingCollection<Info>();
              var coloredItems = new BlockingCollection<Info>();
                
              Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
              ConsoleHelper.WriteLine("started stage 1");
              Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
              ConsoleHelper.WriteLine("started stage 2");
              Task t3 = PipelineStages.ProcessContentAsync(lines, words);
              await Task.WhenAll(t1, t2, t3);
              ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
            
              //当上面三个任务完成时,才执行下面的任务
              Task t4 = PipelineStages.TransferContentAsync(words, items);
              Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
              Task t6 = PipelineStages.ShowContentAsync(coloredItems);
              ConsoleHelper.WriteLine("stages 4, 5, 6 started");
    
              await Task.WhenAll(t4, t5, t6);
    
              ConsoleHelper.WriteLine("all stages finished");
            }
            
            
              public static class PipelineStages
              {
                public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
                {
                  return Task.Run(() =>
                    {
                      foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
                      {
                        output.Add(filename);
                        ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
                      }
                      //调用CompleteAdding,通知所有读取器不应再等待集合中的任何额外项
                        //如果不调用该方法,读取器会在foreach循环中等待更多的项被添加
                      output.CompleteAdding();
                    });
                }
    
                public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
                {
                    //使用读取器读取集合时,需要使用GetConsumingEnumerable获取阻塞集合的枚举器,
                //如果直接使用input迭代集合,这只会迭代当前状态的集合,不会迭代以后添加的项
                  foreach (var filename in input.GetConsumingEnumerable())
                  {
                    using (FileStream stream = File.OpenRead(filename))
                    {
                      var reader = new StreamReader(stream);
                      string line = null;
                      while ((line = await reader.ReadLineAsync()) != null)
                      {
                        output.Add(line);
                        ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
                      }
                    }
                  }
                  output.CompleteAdding();
                }
    
                public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
                {
                  return Task.Run(() =>
                    {
                      foreach (var line in input.GetConsumingEnumerable())
                      {
                        string[] words = line.Split(' ', ';', '	', '{', '}', '(', ')', ':', ',', '"');
                        foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
                        {
                        //这里使用了字典的一个扩展方法
                          output.AddOrIncrementValue(word);
                          ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
                        }
                      }
                    });
                }
    
                public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
                {
                  return Task.Run(() =>
                    {
                      foreach (var word in input.Keys)
                      {
                        int value;
                        if (input.TryGetValue(word, out value))
                        {
                          var info = new Info { Word = word, Count = value };
                          output.Add(info);
                          ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
                        }
                      }
                      output.CompleteAdding();
                    });
                }
    
                public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
                {
                  return Task.Run(() =>
                    {
                      foreach (var item in input.GetConsumingEnumerable())
                      {
                        if (item.Count > 40)
                        {
                          item.Color = "Red";
                        }
                        else if (item.Count > 20)
                        {
                          item.Color = "Yellow";
                        }
                        else
                        {
                          item.Color = "Green";
                        }
                        output.Add(item);
                        ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
                      }
                      output.CompleteAdding();
                    });
                }
    
                public static Task ShowContentAsync(BlockingCollection<Info> input)
                {
                  return Task.Run(() =>
                    {
                      foreach (var item in input.GetConsumingEnumerable())
                      {
                        ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
                      }
                    });
                }
              }
              
              
              //创建一个字典的扩展方法
             public static class ConcurrentDictionaryExtension
              {
                public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
                {
                  bool success = false;
                  while (!success)
                  {
                    int value;
                    if (dict.TryGetValue(key, out value))
                    {
                      if (dict.TryUpdate(key, value + 1, value))
                      {
                        success = true;
                      }
                    }
                    else
                    {
                      if (dict.TryAdd(key, 1))
                      {
                        success = true;
                      }
                    }
                  }
                }
              }

    这里使用了一个管道模型的编程模式,上面的添加内容,下面处理内容
       

  • 相关阅读:
    webpack4从0开始构建前端单页项目(8)处理css的loader
    webpack4从0开始构建前端单页项目(7)用babel-loader处理js㈣transform-runtime
    webpack4从0开始构建前端单页项目(6)用babel-loader处理js㈢babel-polyfill
    webpack4从0开始构建前端单页项目(5)用babel-loader处理js㈡.babelrc文件
    webpack4从0开始构建前端单页项目(4)用babel-loader处理js㈠配置
    webpack4从0开始构建前端单页项目(3)用webpack-dev-server搭建热加载开发环境
    webpack4从0开始构建前端单页项目(2)用html-webpack-plugin生成html文件
    webpack4从0开始构建前端单页项目(1)整理目录
    webpack二刷笔记(4)webpack的核心概念-插件(plugin)
    webpack二刷笔记(3)webpack的核心概念-loader
  • 原文地址:https://www.cnblogs.com/afei-24/p/6836976.html
Copyright © 2011-2022 走看看