zoukankan      html  css  js  c++  java
  • 关于“分叉/联接方案”的一般做法

    分叉/联接方案是指:在需要多线程计算的场合,通过在步骤A创建N个执行线程(分叉)后等待所有的线程执行完毕在执行步骤B(联接)。

    .NET2.0

    在.NET2.0的时代,我们通常会使用 ThreadPool.QueueUserWorkItem 创建N个执行线程,通过为每个线程绑定一个ManualResetEvent 对象,再通过WaitHandle.WaitAll方法执行等待;不过这里有个问题,就是WaitAll方法只能等待一定数量的线程,通常为64,一旦我们创建的线程超过64,会抛出如下的异常:

    WaitHandles must be less than or equal to 64

    具体的代码说明,请参考 C#多线程之三:解决多线程编程中大并发数等待唤醒的问题

     在上面的《C#多线程之三:解决多线程编程中大并发数等待唤醒的问题》文章中,作者通过创建了一个MutipleThreadResetEvent类,通过Interlocked.Decrement方法进行计数来实现。在这里我Copy了他的代码如下:

    /********************************************************************************
     * Copyright © 2001 - 2010Comit. All Rights Reserved.
     * 文件:MutipleThreadResetEvent.cs
     * 作者:杨柳
     * 日期:2010年11月13日
     * 描述:封装 ManualResetEvent ,该类允许一次等待N(N>64)个事件执行完毕
     * 
     *       解决问题:WaitHandle.WaitAll(evetlist)方法最大只能等待64个ManualResetEvent事件
     * *********************************************************************************/
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
     
    namespace TestMutipleThreadRestEvent
    {
        /// <summary>
        ///  封装ManualResetEvent
        /// </summary>
        public class MutipleThreadResetEvent : IDisposable
        {
            private readonly ManualResetEvent done;
            private readonly int total;
            private long current;
     
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="total">需要等待执行的线程总数</param>
            public MutipleThreadResetEvent(int total)
            {
                this.total = total;
                current = total;
                done = new ManualResetEvent(false);
            }
     
            /// <summary>
            /// 唤醒一个等待的线程
            /// </summary>
            public void SetOne()
            {
                // Interlocked 原子操作类 ,此处将计数器减1
                if (Interlocked.Decrement(ref current) == 0)
                {
                    //当所以等待线程执行完毕时,唤醒等待的线程
                    done.Set();
                }
            }
     
            /// <summary>
            /// 等待所以线程执行完毕
            /// </summary>
            public void WaitAll()
            {
                done.WaitOne();
            }
     
            /// <summary>
            /// 释放对象占用的空间
            /// </summary>
            public void Dispose()
            {
                ((IDisposable)done).Dispose();
            }
        } 
     
    }

    测试代码如下:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
     
    namespace TestMutipleThreadRestEvent
    {
        /// <summary>
        /// 测试MutipleThreadResetEvent
        /// </summary>
        class Program
        {
            static int i = 0;
     
            /// <summary>
            /// 主方法
            /// </summary>
            /// <param name="args">参数</param>
            static void Main(string[] args)
            {
                //假设有100个请求线程
                int num = 100;
     
                //使用 MutipleThreadResetEvent
                using (var countdown = new MutipleThreadResetEvent(num))
                {
                    for (int i=0;i<num;i++)
                    {
                        //开启N个线程,传递MutipleThreadResetEvent对象给子线程
                        ThreadPool.QueueUserWorkItem(MyHttpRequest, countdown);
                    }
     
                    //等待所有线程执行完毕
                    countdown.WaitAll();
                }
     
                Console.WriteLine("所有的网络请求以及完毕,可以继续下面的分析...");
                Console.ReadKey();
            }
     
            /// <summary>
            /// 假设的网络请求
            /// </summary>
            /// <param name="state">参数</param>
            private static void MyHttpRequest(object state)
            {
               // Thread.Sleep(1000);
                Console.WriteLine(String.Format("哈哈:{0}",++i));
     
                MutipleThreadResetEvent countdown = state as MutipleThreadResetEvent;
                //发送信号量 本线程执行完毕
                countdown.SetOne();
            }
        }
    }

    .NET4.0

    在.NET Framework4.0中,微软为我们提供了CountdownEvent类。

    System.Threading.CountdownEvent 是一个同步基元,它在收到一定次数的信号之后,将会解除对其等待线程的锁定。 CountdownEvent 专门用于以下情况:您必须使用ManualResetEvent 或 ManualResetEventSlim,并且必须在用信号通知事件之前手动递减一个变量。 例如,在分叉/联接方案中,您可以只创建一个信号计数为 5 的CountdownEvent,然后在线程池上启动五个工作项,并且让每个工作项在完成时调用 Signal 每次调用 Signal 时,信号计数都会递减 1。 在主线程上,对 Wait 的调用将会阻塞,直至信号计数为零。”

    下面是微软给出的测试代码:

    IEnumerable<Data> source = GetData();
    using (CountdownEvent e = new CountdownEvent(1))
    {
        // fork work:
        foreach (Data element in source)
        {
            // Dynamically increment signal count.
            e.AddCount();
            ThreadPool.QueueUserWorkItem(delegate(object state)
             {
                 try
                 {
                     ProcessData(state);
                 }
                 finally
                 {
                     e.Signal();
                 }
             },
             element);
        }
        e.Signal();
    
        // The first element could be run on this thread.
    
        // Join with work.
        e.Wait();
    }
    // .,.

    更多的其它使用方式,请参阅 CountdownEvent

    使用  System.Threading.Tasks.Task类。代码如下:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Start...");
                Task[] tasks = new Task[50];
                for (int i = 0; i < 50; i++)
                {
                    tasks[i] = new Task(new Action<object>((o) => 
                    {
                        System.Threading.Thread.Sleep(500);
                        Console.WriteLine(string.Format("{0} is ok", o));
                    }), i);
                    tasks[i].Start();
                }
                Task.WaitAll(tasks);
                Console.WriteLine("End\nPress any key to exit");
                Console.ReadKey();
            }
        }
    }

    运行时截图如下:

  • 相关阅读:
    Linux系统分支之Ubuntu
    运维工具之Netdata
    Antd Tree组件虚拟滚动空白问题
    没有root权限的情况下安装vim
    C++ / Python测量程序执行时间
    Linux dmidecode 命令介绍
    网卡到底是什么
    flannel的革命性的变化是在哪里呢?
    kube-proxy
    cilium
  • 原文地址:https://www.cnblogs.com/bruceleeliya/p/2990120.html
Copyright © 2011-2022 走看看