zoukankan      html  css  js  c++  java
  • 关于Paralle.For和Paralle.ForEach

    .NET4中加入了并行机制——所谓并行就是同时开辟若干个线程来进行计算。这些线程由于都是互相独立的,所以在进行一些分布式(比如各自不同的工作)的时候是非常简单,不过要把这些处理结果汇总起来却不是那么容易——下面来看一个非常简单的例子(求1~1000的和)。

    如果你尝试使用以下的代码计算,恐怕令你大跌眼镜!

    [C#]

    int sum = 0;
    Parallel.For(0, 1000,i => {sum+=i;});

    [VB.NET]

    Dim sum As Integer = 0
    Parallel.[For](0, 1000, Function(i) 
    sum += i)

    究其原因就是.NET会默认开辟一些线程同时进行“sum+=i”的计算。那么由于sum被这些线程同时使用,往往是一个线程还没有处理完毕,另外一个线程又介入了,自然无法得到正确结果了。

    解决这个问题的办法有许多:

    【一】分解法:

    所谓分解法,就是针对“同一个变量”被不同线程“共享”这一诟病而提出的。——也就是说,把1~1000求和分成若干块进行处理(等于给每一个线程分配了不同的内存)。最后把分布计算的结果进行累计汇总即可。结果如下:

    [C#]

    int[] numbers = Enumerable.Range(1, 1000).ToArray();
    int[] values=new int[4];
    int sum = 0;
    Parallel.For(0, 4, i => { values[i] = new Program().GetTotal(i * 250, 250, numbers); });
    sum = values.Sum();
    Console.WriteLine(sum);

    [VB.NET]

    Dim numbers As Integer() = Enumerable.Range(1, 1000).ToArray()
    Dim values As Integer() = New Integer(3) {}
    Dim sum As Integer = 0
    Parallel.[For](0, 4, Function(i) 
    values(i) = New Program().GetTotal(i * 250, 250, numbers))
    sum = values.Sum()
    Console.WriteLine(sum)

    【二】使用lock(锁住一个变量,然后直到该线程操作完毕自动释放变量,另外一个线程进来操作……如此反复而已):

    [C#]

     public class Program
            {
                int sum = 0;

                 private static object obj = new object();

                public void ShowResult()
                {
                    
                    Parallel.For(1, 1001, i => { lock (obj) { sum += i; Thread.Sleep(10); } });
                    Console.WriteLine(sum);
                }
    
                static void Main(string[] args)
                {
                    Program p = new Program();
                    p.ShowResult();
                }
            }

    [VB.NET]

    Public Class Program
        Private sum As Integer = 0
        Private Shared obj As New Object
        Public Sub ShowResult()
            Dim obj As New Object()
            Parallel.For(1, 1001, Sub(i)
                                      SyncLock obj
                                          sum = sum + i
                                      End SyncLock
                                  End Sub)
                                      Console.WriteLine(sum)
        End Sub
    End Class
    
    Module M
        Sub Main()
            Dim p As New Program
            p.ShowResult()
        End Sub
    End Module

    【三】使用InternLock函数:

    [C#]

     public class Program
            {
                int sum = 0;
                public void ShowResult()
                {
                    Parallel.For(1, 1001, i => { Interlocked.Add(ref sum, i); });
                    Console.WriteLine(sum);
                }
    
                static void Main(string[] args)
                {
                    Program p = new Program();
                    p.ShowResult();
                }
            }

    [VB.NET]

    Public Class Program
        Private sum As Integer = 0
        Public Sub ShowResult()
            Parallel.[For](1, 1001, Sub(i)
                                        Interlocked.Add(sum, i)
                                    End Sub)
            Console.WriteLine(sum)
        End Sub
    
        Shared Sub Main(args As String())
            Dim p As New Program()
            p.ShowResult()
        End Sub
    End Class


    一个更复杂的例子在于多线程“并行”遍历某个文件夹中全部的文件,并且添加到DataTable中:

    [C#]

    public class Program
        {
            /// <summary> 
            /// Each time process with 3 files 
            /// </summary> 
            public const int BLOCKFILEPROCESS = 3;
            private static object flag = new object();
    
            static void Main(string[] args)
            {
                DataTable dt = new DataTable();
                dt.Columns.Add("Id", typeof(int));
                dt.Columns.Add("FileName", typeof(string));
                dt.Columns.Add("ExtensionName", typeof(string));
                dt.Columns[0].AutoIncrement = true;
                dt.Columns[0].AutoIncrementSeed = 1;
                dt.Columns[0].AutoIncrementStep = 1;
    
                string[] files = Directory.GetFiles("c:\\文件夹名称", "*.*", SearchOption.AllDirectories);
                int totalFiles = files.Length;
                int finalThreadNum = totalFiles / BLOCKFILEPROCESS == 0 ? (totalFiles / BLOCKFILEPROCESS) : (totalFiles / BLOCKFILEPROCESS + 1);
    
                Parallel.For<string[]>(0, finalThreadNum, () => files, (i, state, processCollection) =>
                {
                    return processCollection.Skip(i * BLOCKFILEPROCESS).Take(BLOCKFILEPROCESS).ToArray();
                },
                (result) =>
                {
                  Monitor.Enter(flag);
                    {
                     foreach (var item in result)
                        {
                            DataRow row = dt.NewRow();
                            row["FileName"] = Path.GetFileName(item);
                            row["ExtensionName"] = Path.GetExtension(item);
                            dt.Rows.Add(row);
                        }
                        Monitor.Exit(flag);
                    }
                       
                });
    
                foreach (DataRow item in dt.Rows)
                {
                    Console.WriteLine(item["Id"].ToString() + "<==>" + item["FileName"] + "<==>" + item["ExtensionName"].ToString());
                }
            }
        }

    [VB.NET]

    Public Class Program
        ''' <summary> 
        ''' Each time process with 3 files 
        ''' </summary> 
        Public Const BLOCKFILEPROCESS As Integer = 3
        Private Shared flag As New Object()
    
        Private Shared Sub Main(args As String())
            Dim dt As New DataTable()
            dt.Columns.Add("Id", GetType(Integer))
            dt.Columns.Add("FileName", GetType(String))
            dt.Columns.Add("ExtensionName", GetType(String))
            dt.Columns(0).AutoIncrement = True
            dt.Columns(0).AutoIncrementSeed = 1
            dt.Columns(0).AutoIncrementStep = 1
    
            Dim files As String() = Directory.GetFiles("c:\文件夹名称", "*.*", SearchOption.AllDirectories)
            Dim totalFiles As Integer = files.Length
            Dim finalThreadNum As Integer = If(totalFiles \ BLOCKFILEPROCESS = 0, (totalFiles \ BLOCKFILEPROCESS), (totalFiles \ BLOCKFILEPROCESS + 1))
    
            Parallel.[For](Of String())(0, finalThreadNum, Function() files, Function(i, state, processCollection) 
            Return processCollection.Skip(i * BLOCKFILEPROCESS).Take(BLOCKFILEPROCESS).ToArray()
    
    End Function, Function(result) 
            Monitor.Enter(flag)
            If True Then
                For Each item As var In result
                    Dim row As DataRow = dt.NewRow()
                    row("FileName") = Path.GetFileName(item)
                    row("ExtensionName") = Path.GetExtension(item)
                    dt.Rows.Add(row)
                Next
                Monitor.[Exit](flag)
    
            End If
    
    End Function)
    
            For Each item As DataRow In dt.Rows
                Console.WriteLine((item("Id").ToString() & "<==>") + item("FileName") & "<==>" & item("ExtensionName").ToString())
            Next
        End Sub
    End Class

    上面的示例代码使用了Monitor进行对多线程访问同一个对象的锁定和解锁(其实lock的本质是调用了Monitor,不过lock花费的时间比Monitor要长)。如果不用lock或者Monitor锁定,那么将造成多线程访问最有一个DataTable,造成竞争现象的发生。相对第一个For的非泛型示例而言,最大区别在于第一个非泛型的For解决方案1在求和时是为每一个线程单独分配了存储单元,然而这个示例的存储单元(DataTable)却是共享的。

    一般地,如果多线程共享一个资源,对一个资源操作,必须采用“同步”机制来控制!


    另外,这里的For使用了其泛型的版本:第一,第二个参数决定循环的次数(其实是分块多少块,准备开多少线程;第三个参数每次会返回一个IEnumerable集合供特性的线程操作,第四个参数委托将自定义返回给当前已经创建线程的集合(示例中是使用Skip+Take方法计算返回给当前线程多少数据)。最后一个委托是无参数的Action,直接对返回的集合进行处理(添加进入表格中)。

    同样这里还可以使用Paralle.ForEach,注意ForEach无法分块(因为内部已经分块的,具体开多少线程无法人为控制),代码参考如下:

    [C#]

     public class Program
        {
            /// <summary> 
            /// Each time process with 3 files 
            /// </summary> 
            public const int BLOCKFILEPROCESS = 3;
            private static object flag = new object();
    
            static void Main(string[] args)
            {
                DataTable dt = new DataTable();
                dt.Columns.Add("Id", typeof(int));
                dt.Columns.Add("FileName", typeof(string));
                dt.Columns.Add("ExtensionName", typeof(string));
                dt.Columns[0].AutoIncrement = true;
                dt.Columns[0].AutoIncrementSeed = 1;
                dt.Columns[0].AutoIncrementStep = 1;
    
                string[] files = Directory.GetFiles("c:\\安装", "*.jpg", SearchOption.TopDirectoryOnly);
    
                Parallel.ForEach<string>(files, (s) => 
                {
                    lock (flag)
                    {
                        DataRow row = dt.NewRow();
                        row["FileName"] = Path.GetFileName(s);
                        row["ExtensionName"] = Path.GetExtension(s);
                        dt.Rows.Add(row);
                    }
                });
    
                foreach (DataRow item in dt.Rows)
                {
                    Console.WriteLine(item["Id"].ToString() + "<==>" + item["FileName"] + "<==>" + item["ExtensionName"].ToString());
                }
            }
        }

    [VB.NET]

    Public Class Program
        ''' <summary> 
        ''' Each time process with 3 files 
        ''' </summary> 
        Public Const BLOCKFILEPROCESS As Integer = 3
        Private Shared flag As New Object()
    
        Private Shared Sub Main(args As String())
            Dim dt As New DataTable()
            dt.Columns.Add("Id", GetType(Integer))
            dt.Columns.Add("FileName", GetType(String))
            dt.Columns.Add("ExtensionName", GetType(String))
            dt.Columns(0).AutoIncrement = True
            dt.Columns(0).AutoIncrementSeed = 1
            dt.Columns(0).AutoIncrementStep = 1
    
            Dim files As String() = Directory.GetFiles("c:\安装", "*.jpg", SearchOption.TopDirectoryOnly)
    
            Parallel.ForEach(Of String)(files, Function(s) 
            SyncLock flag
                Dim row As DataRow = dt.NewRow()
                row("FileName") = Path.GetFileName(s)
                row("ExtensionName") = Path.GetExtension(s)
                dt.Rows.Add(row)
            End SyncLock
    
    End Function)
    
            For Each item As DataRow In dt.Rows
                Console.WriteLine((item("Id").ToString() & "<==>") + item("FileName") & "<==>" & item("ExtensionName").ToString())
            Next
        End Sub
    End Class
  • 相关阅读:
    redis集群报错:(error) MOVED 5798 127.0.0.1:7001
    20190829小记
    20181114小结记录
    遇到的面试题记录
    机器学习-KNN算法原理 && Spark实现
    机器学习-KMeans算法原理 && Spark实现
    大数据开发-生产中遇到的10个致命问题
    大数据开发-Spark-闭包的理解
    大数据开发-Spark-共享变量之累加器和广播变量
    大数据开发-Spark-RDD的持久化和缓存
  • 原文地址:https://www.cnblogs.com/ServiceboyNew/p/2497340.html
Copyright © 2011-2022 走看看