zoukankan      html  css  js  c++  java
  • ParallelProgramming-多消费者,多生产者同时运行并行

    在上一篇文章演示了并行的流水线操作(生产者和消费者并行同时执行),C#是通过BlockingCollection这个线程安全的对象作为Buffer,并且结合Task来实现的。但是上一篇文章有个缺陷,在整个流水线上,生产者和消费者是唯一的。本文将演示多个消费者多个生产者同时并行执行。

    一、多消费者、多生产者示意图

     与前一篇文章演示的流水线思想类似,不同之处就是本文的topic:消费者和生产者有多个,以buffer1为例,起生产者有两个,消费者有两个,现在有三个纬度的并行:

    1. Action1和Action2并行(消费者和生产者并行)
    2. 消费者并行(Action2.1和Action2.2并行)
    3. 生产者并行(Action1.1和Action1.2并行)

    二、实现

    2.1 代码

     class PiplelineDemo
        {
            PRivate int seed;
            public PiplelineDemo()
            {
                seed = 10;
            }
    
            public void Action11(BlockingCollection<string> output)
            {
                for (var i = 0; i < seed; i++)
                {
                    output.Add(i.ToString());//initialize data to buffer1
                }
            }
    
            public void Action12(BlockingCollection<string> output)
            {
                for (var i = 0; i < seed; i++)
                {
                    output.Add(i.ToString());//initialize data to buffer1
                }
            }
    
            public void Action21(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    var itemToInt = int.Parse(item);
                    output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
                }
            }
    
            public void Action22(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    var itemToInt = int.Parse(item);
                    output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
                }
            }
    
            public void Action31(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    output.Add((item));// add new data to buffer3
                }
            }
    
            public void Action32(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    output.Add((item));// add new data to buffer3
                }
            }
            public void Pipeline()
            {
                var buffer1 = new BlockingCollection<string>(seed * 2);
                var buffer2 = new BlockingCollection<string>(seed * 2);
                var buffer3 = new BlockingCollection<string>(seed * 2);
                var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
                var stage11 = taskFactory.StartNew(() => Action11(buffer1));
                var stage12 = taskFactory.StartNew(() => Action12(buffer1));
                Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) =>
                {
                    buffer1.CompleteAdding();
                });
                var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2));
                var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2));
                Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) =>
                {
                    buffer2.CompleteAdding();
                });
                var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3));
                var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3));
                Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) =>
                {
                    buffer3.CompleteAdding();
                });
                Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32);
                foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3
                {
                    Console.WriteLine(item);
                }
            }
        }

    2.2 运行结果

    2.3 代码解释

    1. Action11和Action12相对比较好理解。初始化数据到buffer1。
    2. Action2.1和Action2.2相对比较费解,他们同时接受buffer1作为输入,为什么最终的结果Buffer2没有产生重复? 最后由Action21,action22同时产生的buffer3为什么也没有重复?这就是GetConsumingEnumerable这个方法的功劳。这个方法会将buffer的数据分成多份给多个消费者,如果一个value已经被一个消费者获取,那么其他消费者将不会再拿到这个值。这就回答了为什么没有重复这个问题。
    3. 上面方法同时使用了多任务延续(ContinueWhenAll)对buffer的调用CompleteAdding方法:该方法非常重要,如果没有调用这个方法,程序会进入死锁,因为消费者(consumer)会处于一直的等待状态。
  • 相关阅读:
    高级定制_百度百科
    大叔也学Xamarin系列
    WebApi系列~dynamic让你的省了很多临时类
    大叔也说Xamarin~Android篇~支付宝SDK的集成
    知方可补不足~sqlserver中使用ROW_NUMBER进行的快速分页
    EF架构~有时使用SQL更方便
    【deep learning学习笔记】注释yusugomori的LR代码 --- LogisticRegression.h
    线程同步
    泛型接口的实现方式之二
    jsp获得本地及serverIP的方法
  • 原文地址:https://www.cnblogs.com/endv/p/6781507.html
Copyright © 2011-2022 走看看