zoukankan      html  css  js  c++  java
  • 一个简单的Parallel.ForEach实现

    在.net的Task Parallel Library中有一个很方便的功能Parallel.ForEach,可以实现多任务的并发执行,另外还带着栅栏功能,非常好用。但是这一功能必须需要clr4.0支持(CTP版的不大好用),对于低版本的.net要实现类似功能只有自己写一个了。

    codeproject上面文章Poor Man's Parallel.ForEach Iterator中就有一种简单而有效的实现。但作者附录的代码有如下几个问题:

    1. 无法对每个并发任务分别制定不同的线程数
    2. 算法本身有点问题,任务执行完会报错
    3. 不能快速响应异常

    针对以上几点,我对那段代码做了一点小改进,代码如下:

    static class Parallel
    {
        public static void ParallelForEach<T>(this IEnumerable<T> enumerable, Action<T> action, int NumberOfParallelTasks)
        {
            var syncRoot = new object();

            if (enumerable == null) return;

            var enumerator = enumerable.GetEnumerator();

            InvokeAsync<T> del = InvokeAction;

            var seedItemArray = new T[NumberOfParallelTasks];
            var resultList = new List<IAsyncResult>(NumberOfParallelTasks);
            var waitHanles = new List<WaitHandle>(NumberOfParallelTasks);

            for (int i = 0; i < NumberOfParallelTasks; i++)
            {

                lock (syncRoot)
                {
                    if (!enumerator.MoveNext())
                        break;
                    seedItemArray[i] = enumerator.Current;
                }

                var iAsyncResult = del.BeginInvoke(enumerator, action, seedItemArray[i], syncRoot, i, null, null);
                resultList.Add(iAsyncResult);
                waitHanles.Add(iAsyncResult.AsyncWaitHandle);
            }

            var taskCount = waitHanles.Count;

            for (int i = 0; i < taskCount; i++)
            {
                var index = WaitHandle.WaitAny(waitHanles.ToArray());
                del.EndInvoke(resultList[index]);
                resultList[index].AsyncWaitHandle.Close();
                waitHanles.RemoveAt(index);
                resultList.RemoveAt(index);
            }
        }

        delegate void InvokeAsync<T>(IEnumerator<T> enumerator,
        Action<T> achtion, T item, object syncRoot, int i);

        static void InvokeAction<T>(IEnumerator<T> enumerator, Action<T> action,
                T item, object syncRoot, int i)
        {
            //if (String.IsNullOrEmpty(Thread.CurrentThread.Name))
            // Thread.CurrentThread.Name =
            //String.Format("Parallel.ForEach Worker Thread No:{0}", i);

            bool moveNext = true;

            while (moveNext)
            {
                try
                {
                    action.Invoke(item);
                }
                catch (Exception)
                {
                    throw;
                }


                lock (syncRoot)
                {
                    moveNext = enumerator.MoveNext();
                    if (moveNext)
                        item = enumerator.Current;
                }
            }
        }
    }

    整个算法非常简洁,这里就不多介绍了。如果有错误欢迎指正。

  • 相关阅读:
    c# 对文件操作
    c# 获取当前绝对路径
    c# 过滤特殊字符
    c# txt代码转换成HTML格式
    c# 对象集合转Json
    c# DataReader转换为Json
    c# DataSet转换为Json
    ABAP-SAP的LUW和DB的LUW的区别
    ABAP-关于隐式与显式的DB Commit
    ABAP-Keyword Documentation
  • 原文地址:https://www.cnblogs.com/TianFang/p/1512588.html
Copyright © 2011-2022 走看看