zoukankan      html  css  js  c++  java
  • ThreadPool做并发程序

    参考文章:www.cnblogs.com/xugang/archive/2008/03/23/1118584.html

                 http://www.cnblogs.com/SkySoot/archive/2012/04/01/2429259.html

    在多线程的程序中,经常会出现两种情况:
       1. 应用程序中线程把大部分的时间花费在等待状态,等待某个事件发生,然后给予响应。这一般使用 ThreadPool(线程池)来解决。 
       2. 线程平时都处于休眠状态,只是周期性地被唤醒。这一般使用 Timer(定时器)来解决。

    ThreadPool 类提供一个由系统维护的线程池(可以看作一个线程的容器),该容器需要 Windows 2000 以上系统支持,因为其中某些方法调用了只有高版本的Windows 才有的 API 函数。

    将线程安放在线程池里,需使用 ThreadPool.QueueUserWorkItem() 方法,该方法的原型如下:
        // 将一个线程放进线程池,该线程的 Start() 方法将调用 WaitCallback 代理对象代表的函数
        public static bool QueueUserWorkItem(WaitCallback);
        // 重载的方法如下,参数 object 将传递给 WaitCallback 所代表的方法
        public static bool QueueUserWorkItem(WaitCallback, object);
    注意:
        ThreadPool 类是一个静态类,你不能也不必要生成它的对象。而且一旦使用该方法在线程池中添加了一个项目,那么该项目将是无法取消的。这里你无需自己建立线程,只需把 你要做的工作写成函数,然后作为参数传递给ThreadPool.QueueUserWorkItem()方法就行了,传递的方法就是依靠 WaitCallback 代理对象,而线程的建立、管理、运行等工作都是由系统自动完成的,你无须考虑那些复杂的细节问题。

    ThreadPool 的用法:
        首先程序创建了一个 ManualResetEvent 对象,该对象就像一个信号灯,可以利用它的信号来通知其它线程。本例中,当线程池中所有线程工作都完成以后,ManualResetEvent 对象将被设置为有信号,从而通知主线程继续运行。
    ManualResetEvent 对象有几个重要的方法:
        初始化该对象时,用户可以指定其默认的状态(有信号/无信号);
        在初始化以后,该对象将保持原来的状态不变,直到它的 Reset() 或者 Set() 方法被调用:
        Reset():
            将其设置为无信号状态;
        Set():
            将其设置为有信号状态。
        WaitOne():
            使当前线程挂起,直到 ManualResetEvent 对象处于有信号状态,此时该线程将被激活。然后,程序将向线程池中添加工作项,这些以函数形式提供的工作项被系统用来初始化自动建立的线程。当所有的线程 都运行完了以后,ManualResetEvent.Set() 方法被调用,因为调用了 ManualResetEvent.WaitOne() 方法而处在等待状态的主线程将接收到这个信号,于是它接着往下执行,完成后边的工作。

    下面是我自己做的例子:

    比较浅显的

    public class ThreadPoolManualResetEventDemo
        {
            public static void Main() 
            {
                Console.WriteLine("测试");
                //把信号量设置成无信号状态的
                ManualResetEvent eventX = new ManualResetEvent(false);            
                int maxCount = 100;
                SomeState somstate = new SomeState(maxCount);
                for (int i = 0; i < maxCount; i++)
                {
                    ThreadPool.QueueUserWorkItem(new WaitCallback(ShowMsg), new ThreadData() { Index = i, WaitHandle = eventX, Somestate = somstate });
                }
                Console.WriteLine("等待线程执行结束");
                eventX.WaitOne(Timeout.Infinite, true);
                Console.WriteLine("结束了");
                Console.ReadKey();
            }
            public static void ShowMsg(object obj) {
                ThreadData td = (ThreadData)obj;
                Console.WriteLine("测试信息"+obj);
                Thread.Sleep(5000);
                if (Interlocked.Decrement(ref td.Somestate.maxCount) == 0)
                {
                    td.WaitHandle.Set();
                }
            }
        }
        class ThreadData
        {
            public int Index { get; set; }
            public object Data { get; set; }
            public ManualResetEvent WaitHandle { get; set; }
            public SomeState Somestate { set; get; }
        }
    
        public class SomeState
        {
            public int maxCount;
            public SomeState(int count)
            {
                this.maxCount = count;
            }
        }

    下面的是我们项目里面用的一个并发,原理也是线程池和ManualResetEvent来控制的。一个同事封装的

    public class Parallel
        {
            class ThreadData
            {
                public int Index { get; set; }
                public object Data { get; set; }
                public CountdownEvent WaitHandle { get; set; }
            }
            /// <summary>
            /// default timeout 60 minutes.
            /// </summary>
            static TimeSpan defaultTimeout = TimeSpan.FromMinutes(60);
            static void Log(int item, Exception ex)
            {
                Debug.WriteLine(string.Format("error position {0}:{1}", item, ex.ToString()));
            }
            static void Log<T>(T item, Exception ex)
            {
                Debug.WriteLine(string.Format("error item {0}:{1}", JsonConvert.SerializeObject(item), ex.ToString()));
            }
            public static void For(int len, Action<int> action)
            {
                For(len, action, defaultTimeout);
            }
            public static void For(int len, Action<int> action, TimeSpan timeout)
            {
                For(len, action, Log, timeout);
            }
            public static void For(int len, Action<int> action, Action<int, Exception> handleError)
            {
                For(len, action, handleError, defaultTimeout);
            }
            public static void For(int len, Action<int> action, Action<int, Exception> handleError, TimeSpan timeout)
            {
                AggregateException errors = new AggregateException();
                using (CountdownEvent evt = new CountdownEvent(len))
                {
                    for (int i = 0; i < len; i++)
                    {
                        ThreadPool.QueueUserWorkItem(obj =>
                        {
                            var shared = obj as Parallel.ThreadData;
                            try
                            {
                                action(shared.Index);
                            }
                            catch (Exception ex)
                            {
                                handleError(shared.Index, ex);
                                errors.AddException(shared.Index, ex);
                            }
                            finally
                            {
                                shared.WaitHandle.Signal();
                            }
    
                        }, new ThreadData { Index = i, WaitHandle = evt });
    
                    }
                    evt.Wait(timeout);
                }
                if (errors.Count > 0) throw errors;
            }
            public static void ForEach<T>(IEnumerable<T> source, Action<T> action)
            {
                ForEach<T>(source, action, defaultTimeout);
            }
            public static void ForEach<T>(IEnumerable<T> source, Action<T> action, TimeSpan timeout)
            {
                ForEach<T>(source, action, Log, timeout);
            }
            public static void ForEach<T>(IEnumerable<T> source, Action<T> action, Action<T, Exception> handleError)
            {
                ForEach<T>(source, action, handleError, defaultTimeout);
            }
            public static void ForEach<T>(IEnumerable<T> source, Action<T> action, Action<T, Exception> handleError, TimeSpan timeout)
            {
                AggregateException errors = new AggregateException();
                using (CountdownEvent evt = new CountdownEvent(source.Count()))
                {
                    int i = 0;
                    foreach (var item in source)
                    {
                        Interlocked.Increment(ref i);
                        ThreadPool.QueueUserWorkItem(obj =>
                        {
                            var shared = obj as Parallel.ThreadData;
    
                            try
                            {
                                action(shared.Data.AsType<T>());
                            }
                            catch (Exception ex)
                            {
                                handleError(shared.Data.AsType<T>(), ex);
                                errors.AddException(shared.Index, ex);
                            }
                            finally
                            {
                                shared.WaitHandle.Signal();
                            }
    
                        }, new ThreadData { Index = i, Data = item, WaitHandle = evt });
                    }
    
                    evt.Wait(timeout);
                }
                if (errors.Count > 0) throw errors;
            }
            public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func)
            {
                return ForEach<T, TResult>(source, func, defaultTimeout);
            }
            public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, TimeSpan timeout)
            {
                return ForEach<T, TResult>(source, func, Log, defaultTimeout);
            }
            public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, Action<T, Exception> handleError)
            {
                return ForEach<T, TResult>(source, func, handleError, defaultTimeout);
            }
            public static ConcurrentDictionary<int, TResult> ForEach<T, TResult>(IEnumerable<T> source, Func<T, TResult> func, Action<T, Exception> handleError, TimeSpan timeout)
            {
                ConcurrentDictionary<int, TResult> dict = new ConcurrentDictionary<int, TResult>();
                AggregateException errors = new AggregateException();
                using (CountdownEvent evt = new CountdownEvent(source.Count()))
                {
                    int i = 0;
                    foreach (var item in source)
                    {
                        Interlocked.Increment(ref i);
                        ThreadPool.QueueUserWorkItem(obj =>
                        {
                            var shared = obj as Parallel.ThreadData;
                            try
                            {
                                dict[shared.Index] = func(shared.Data.AsType<T>());
                            }
                            catch (Exception ex)
                            {
                                handleError(shared.Data.AsType<T>(), ex);
                                errors.AddException(shared.Index, ex);
                            }
                            finally
                            {
                                shared.WaitHandle.Signal();
                            }
    
                        }, new ThreadData { Index = i, Data = item, WaitHandle = evt });
                    }
                    evt.Wait(timeout);
                }
                if (errors.Count > 0) throw errors;
                return dict;
            }
        }
    
     public class CountdownEvent : IDisposable
        {
            private readonly ManualResetEvent done;
            private readonly int total;
            private long current;
    
            public CountdownEvent(int total)
            {
                this.total = total;
                current = total;
                done = new ManualResetEvent(false);
            }
    
            public void Signal()
            {
                if (Interlocked.Decrement(ref current) == 0)
                {
                    done.Set();
                }
            }
    
            public void Wait()
            {
                done.WaitOne();
            }
            public bool Wait(TimeSpan timeout)
            {
                return done.WaitOne(timeout);
            }
            public void Dispose()
            {
                ((IDisposable)done).Dispose();
            }
        }
    
    public class ParallelException : Exception
        {
            public int Index { get; private set; }
            public ParallelException(int itemIndex) { Index = itemIndex; }
            public ParallelException(int itemIndex, string message) : base(message) { Index = itemIndex; }
            public override string Message
            {
                get
                {
                    return string.Format("the specified item index: {0}, occured :{1}", Index, base.Message);
                }
            }
        }
    
        public class AggregateException : Exception
        {
            List<ParallelException> inners = new List<ParallelException>();
            public AggregateException() : base() { }
            public AggregateException(string message) : base(message) { }
            public ReadOnlyCollection<ParallelException> Exceptions
            {
                get
                {
                    lock (inners)
                    {
                        return inners.AsReadOnly();
                    }
    
                }
            }
            public void AddException(int itemIndex, string message)
            {
                lock (inners)
                {
                    inners.Add(new ParallelException(itemIndex, message));
                }
            }
            public void AddException(int itemIndex, Exception ex)
            {
                AddException(itemIndex, ex.ToString());
            }
            public int Count
            {
                get
                {
                    lock (inners)
                    {
                        return inners.Count;
                    }
                }
            }
            public override string Message
            {
                get
                {
                    StringBuilder sb = new StringBuilder(base.Message);
                    foreach (var ex in Exceptions)
                        sb.AppendLine(ex.ToString());
                    return sb.ToString();
                }
            }
        }

    语言组织能力太弱了,就做下记录,以后用了有个参考。

  • 相关阅读:
    使用递归输出某个目录下所有子目录和文件
    如何进行复杂度分析?
    什么是时间复杂度?什么是空间复杂度?
    什么是复杂度?为什么要进行复杂度分析?
    什么是递归?递归的优缺点是什么?
    Executor 如何使用?
    什么是spring boot?为什么要用?
    spring boot核心配置文件是什么?
    @Autowired的作用是什么?
    @RequestMapping的作用是什么?
  • 原文地址:https://www.cnblogs.com/zyhblogs/p/4002693.html
Copyright © 2011-2022 走看看