zoukankan      html  css  js  c++  java
  • 自制线程池4

    需求:

    有一种任务需要定时的执行,而且非常的耗时,因此我把它放到线程池中执行,并设置线程池为1,如果该任务已经在队列中或正在执行该任务,则不要再将该任务加入线程池中了。

    测试代码如下

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading;
     6 using ThreadPool2;
     7 
     8 namespace ThreadPoolTest.MyThreadPool2Test
     9 {
    10     class Class6
    11     {
    12         static void Main(string[] args)
    13         {
    14             MyThreadPool2 pool=new MyThreadPool2(1,true,30000);
    15             object obj=new object();
    16             Random rnd=new Random();
    17             for (var i = 0; i < 20;i++ )
    18                 pool.QueueUserWorkItem(call, obj, rnd.Next(1,4).ToString(), succ, err);
    19             Console.ReadLine();
    20         }
    21 
    22         private static void err(object state)
    23         {
    24                 Console.WriteLine("err");
    25         }
    26 
    27         private static void succ(object state, object result)
    28         {
    29             Console.WriteLine("succ");
    30         }
    31 
    32         private static object call(object state)
    33         {
    34             while(true)
    35             {
    36                 Thread.Sleep(2000);
    37                 Console.WriteLine("exec");
    38             }
    39         }
    40     }
    41 }
    42 

    线程池代码如下,

    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using Amib.Threading.Internal;
    using Rhino.Commons;

    namespace ThreadPool2
    {
        
    public delegate object WaitCallback2(object state);
        
    public delegate void SuccCallback(object state, object result);
        
    public delegate void ErrCallback(object state);
        
    /// <summary>
        
    /// 此线程池的作用是将某一类特殊的任务交给此线程池执行,
        
    /// 可以设定该线程池的最大线程数,
        
    /// 这类线程池的优点时,占用的资源少,优先级低,
        
    /// 适合于执行任务需要长期执行,不考虑时间因素的任务
        
    /// 同时根据在传入线程池时的标记key,可以Aborted指定任务,
        
    /// 若该任务正在执行或尚在执行队列中
        
    /// </summary>

        public class MyThreadPool2
        
    {
            
    /// <summary>
            
    /// 任务执行队列
            
    /// </summary>

            //static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
            List<WorkerThread> queue = new List<WorkerThread>();
            
    /// <summary>
            
    /// 目前暂定为只使用一个线程,以免耗近资源
            
    /// </summary>

            SynchronizedDictionary<string, WorkerThread> dict = new SynchronizedDictionary<string, WorkerThread>();
            
    private object state;
            AutoResetEvent wait 
    = new AutoResetEvent(false);
            AutoResetEvent wait2 
    = new AutoResetEvent(false);
            
    private int MaxLimitedTime getset; }
            
    private bool IsLimitedExecTime getset; }
            
    private int IdleTimeout getset; }
            
    //private static int _maxThreadNum = 1;
            private int MaxThreadNum
            
    {
                
    //get { return _maxThreadNum; }
                
    //set { _maxThreadNum = value; }
                get;
                
    set;
            }

            
    private MyThreadPool2()
            
    {
                
    //System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
                
    //SetMaxThreadNum(2);
                
    //SetMaxExecTime(false, 10000);
            }

            
    /// <summary>
            
    /// 设置专用线程池的初始参数
            
    /// </summary>
            
    /// <param name="num">线程池的最大线程数,最小为1</param>
            
    /// <param name="b">是否起用限制最大单个任务执行时间设定</param>
            
    /// <param name="MaxLimitedTime">单个任务执行的最大时间</param>

            public MyThreadPool2(int num, bool b, int MaxLimitedTime)
            
    {
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
    new WaitOrTimerCallback(aa), state, 2000true);
                
    if (num < 1)
                    num 
    = 1;
                MaxThreadNum 
    = num;
                IsLimitedExecTime 
    = b;
                
    this.MaxLimitedTime = MaxLimitedTime;
                
    if (IsLimitedExecTime)
                    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, 
    new WaitOrTimerCallback(bb), state,
                                                                            
    this.MaxLimitedTime, true);
            }


            
    /// <summary>
            
    /// 定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
            
    /// </summary>
            
    /// <param name="state"></param>
            
    /// <param name="timedOut"></param>

            private void aa(object state, bool timedOut)
            
    {
                
    //Console.WriteLine("执行aa()将队列中的任务加到线程中");
                lock(WorkerThread.Manual){
                WorkerThread.Manual.Reset();
                
    lock (queue)
                
    {
                    Console.WriteLine(
    "queue count={0}",queue.Count);
                    
    //判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                    List<string> removeKey = new List<string>();
                    List
    <WorkerThread> newTask = new List<WorkerThread>();
                    List
    <string> tasks = new List<string>();
                    
    //Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                    foreach (var kvp in dict)
                    
    {//kvp.Value.ThreadState == ThreadState.Unstarted || 
                        
    //if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)

                        
    //将不活动的线程记录下来并移除
                        if (!kvp.Value.Thread.IsAlive)
                            tasks.Add(kvp.Key);
                        
    //将活动且空闲的线程赋于新的任务
                        if (kvp.Value.Thread.IsAlive == true && kvp.Value.CurrentThreadState == WorkerThreadState.Idle)
                        
    {
                            
    //dict.Remove(kvp.Key);//cancle because of lock

                            WorkerThread a 
    = queue.FirstOrDefault();
                            
    if (a != null)
                            
    {
                                removeKey.Add(kvp.Key);
                                
    //addDict.Add(a.Key, kvp.Value.Change(a));
                                newTask.Add(kvp.Value.Change(a));
                                
    //a.Thread = kvp.Value.Thread;
                                
    //newTask.Add(a);
                                queue.RemoveAt(0);
                                
    //dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
                                
    //将参数加到线程中,并改变线程的状态
                                
    //dict[a.Key].Thread.Resume();
                            }

                            
    else
                                
    break;
                            
    //else
                            
    //{
                            
    //    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                            
    //                                                            2000, true);
                            
    //    return;
                            
    //}

                        }

                    }

                    tasks.ForEach(t 
    => 
                    

                        dict.Remove(t);
                        Debug.WriteLine(
    "移除销毁线程对应的dict中的键值项,key="+t);
                    }
    );
                    removeKey.ForEach(t 
    => dict.Remove(t));
                    newTask.ForEach(t 
    =>
                    
    {
                        Debug.WriteLine(
    "复用线程用于执行新任务"+t.Key);
                        dict.Add(t.Key, t);
                        
    //t.StartExecTime = DateTime.Now;
                        t.Auto.Set();
                        
    //t.CurrentThreadState = WorkerThreadState.Busy;
                        
    //t.Thread.Resume();
                    }
    );
                    
    while (queue.Count > 0 && dict.Count < MaxThreadNum)
                    
    {
                        
    //未到线程池最大池程数时,增加线程
                        WorkerThread b = queue.FirstOrDefault();
                        
    if (b != null)
                        
    {
                            queue.RemoveAt(
    0);
                            
    //Thread thd = new Thread(new ThreadStart(b.Exec));
                            
    //thd.Priority = ThreadPriority.Lowest;
                            
    //dict.Add(b.Key, thd);
                            
    //thd.Start();
                            WorkerThread wt = new WorkerThread();
                            wt.Start(b);
                            dict.Add(wt.Key, wt);
                            wt.Thread.Start();
                            Debug.WriteLine(
    "新建线程用于执行新任务"+ wt.Key);


                            
    //将参数加到线程中,并改变线程的状态
                        }



                    }

                    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
    new WaitOrTimerCallback(aa), state, 2000,
                                                                            
    true);
                }

                WorkerThread.Manual.Set();
                }

            }

            
    private void SetMaxThreadNum(int num)
            
    {
                
    if (num < 1)
                    num 
    = 1;
                MaxThreadNum 
    = num;
            }

            
    private WorkerThread FindSpecificWorkerThreadByKey(string key)
            
    {
                WorkerThread wt;
                dict.TryGetValue(key, 
    out wt);
                
    return wt;
            }

            
    /// <summary>
            
    /// 设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
            
    /// </summary>
            
    /// <param name="b"></param>
            
    /// <param name="time"></param>

            [Obsolete("abandon")]
            
    private void SetMaxExecTime(bool b, int time)
            
    {
                IsLimitedExecTime 
    = b;
                MaxLimitedTime 
    = time;
                
    if (IsLimitedExecTime)
                    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, 
    new WaitOrTimerCallback(bb), state,
                                                                            MaxLimitedTime, 
    true);
            }

            
    /// <summary>
            
    /// 当任务执行超时时,注销该线程
            
    /// </summary>
            
    /// <param name="state"></param>
            
    /// <param name="timedOut"></param>

            private void bb(object state, bool timedOut)
            
    {
                
    //GC.Collect();
                Console.WriteLine("执行bb(),检测是否有线程超时");
                
    lock (WorkerThread.Manual)
                
    {
                    WorkerThread.Manual.Reset();
                    
    //lock (obj)
                    lock (dict.SyncRoot)
                    
    {
                        List
    <string> temp = new List<string>();
                        
    foreach (var kvp in dict)
                        
    {
                            
    if (kvp.Value.CurrentThreadState==WorkerThreadState.Busy &&DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds > MaxLimitedTime)
                            
    {
                                Console.WriteLine(
    "now={0}",DateTime.Now);
                                Console.WriteLine(
    "before={0}",kvp.Value.StartExecTime);
                                temp.Add(kvp.Key);
                            }

                        }

                        
    foreach (var s in temp)
                        
    {
                            Debug.WriteLine(
    "key="+s+"的任务超时,执行该任务的线程将被销毁");
                            _Aborted(s);
                        }

                        System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, 
    new WaitOrTimerCallback(bb), state, MaxLimitedTime, true);
                    }

                    WorkerThread.Manual.Set();
                }


            }

            
    public void Aborted(string key)
            
    {
                
    lock (WorkerThread.Manual)
                
    {
                    WorkerThread.Manual.Reset();
                    _Aborted(key);
                    WorkerThread.Manual.Set();
                }

            }


            
    private void _Aborted(string key)
            
    {
                
    lock (queue)
                
    {
                    
    //任务如果还在队列中则删除该任务
                    int index = queue.FindIndex(t => t.Key == key);
                    
    if (index > -1)
                    
    {
                        queue.RemoveAt(index);
                        Debug.WriteLine(
    "从任务队列中移除指定key="+key+"的任务");
                    }

                }

                
    //lock (dict.SyncRoot)
                
    //{
                    old way now extract method FindSpecificWorkerThreadByKey to split this
                    WorkerThread v 
    = FindSpecificWorkerThreadByKey(key);
                    
    //没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
                    
    //或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
                    
    //只有但指定的key的任务在执行时才删除
                    
    //if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
                    if (v != null && v.CurrentThreadState == WorkerThreadState.Busy)
                    
    {
                        dict.Remove(key);
                        
    /*
                    在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                    过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                    ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                    阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                    如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                    迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                    用Join方法。
                    
    */

                        
                        Debug.WriteLine(
    "销毁正在执行Key="+key+"的任务的线程");
                        
    //这里将回调的方法放线程终止之前,是防止在同一线程时,线程终止了,放在后面回调就不执行了
                        if (v.ErrorCallback != null)
                        
    {
                            v.ErrorCallback(v.State);
                            
    //最后完成任务的时间
                            v.EndExecTime = DateTime.Now;
                        }

                        v.Thread.Abort();
                        v.Thread.Join();


                    }

                
    //}

                    
    //wait.Set();
            }


            
    public void QueueUserWorkItem(WaitCallback2 callback, object state, string key, SuccCallback succ, ErrCallback err)
            
    {
                WorkerThread.Manual.Reset();
                WorkerThread p 
    = new WorkerThread()
                
    {
                    WaitCallback 
    = callback,
                    State 
    = state,
                    Key 
    = key,
                    ErrorCallback 
    = err,
                    SuccessCallback 
    = succ
                }
    ;
                
    //queue.Enqueue(p);
                lock (queue)
                
    {
                    
    if(queue.FindIndex(t=>t.Key==p.Key)==-1 && dict.Contains(p.Key)==false)
                    
    {
                        queue.Add(p);
                        wait.Set();
                    }

                    
    else
                    
    {
                        Console.WriteLine(
    "由于队列或是正在执行的线程中拥有一个同名的key,此次加入线程的工作将被自动抛弃");
                    }

                    
    //Monitor.Pulse(queue);
                }

                WorkerThread.Manual.Set();
            }

            
    public void QueueUserWorkItem(WaitCallback2 callback, object state, SuccCallback succ, ErrCallback err)
            
    {
                QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(), succ, err);
            }


            
    public void QueueUserWorkItem(WaitCallback2 callback, object state, string key)
            
    {
                
    //WorkerThread p = new WorkerThread()
                
    //            {
                
    //                WaitCallback = callback,
                
    //                State = state,
                
    //                Key = key
                
    //            };
                ////queue.Enqueue(p);
                //queue.Add(p);
                
    //wait.Set();
                QueueUserWorkItem(callback, state, key, nullnull);
            }

            
    public void QueueUserWorkItem(WaitCallback2 callback, object state)
            
    {
                QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
            }


        }

        
    public enum WorkerThreadState : byte
        
    {
            None 
    = 0,
            Busy 
    = 1,
            Idle 
    = 2
        }

        
    public class WorkerThread
        
    {
            
    public AutoResetEvent Auto = new AutoResetEvent(false);
            
    public static ManualResetEvent Manual = new ManualResetEvent(true);
            
    public WorkerThreadState CurrentThreadState getset; }
            
    public DateTime StartExecTime getset; }
            
    public DateTime EndExecTime getset; }
            
    public Thread Thread getset; }
            
    public string Key getset; }
            
    public WaitCallback2 WaitCallback getset; }
            
    public SuccCallback SuccessCallback getset; }
            
    public ErrCallback ErrorCallback getset; }
            
    public Object State getset; }
            
    public void Exec()
            
    {
                
    while (true)
                
    {
                    
    this.CurrentThreadState = WorkerThreadState.Busy;
                    
    this.StartExecTime = DateTime.Now;
                    
    if (this.SuccessCallback != null)
                        
    this.SuccessCallback(this.State, this.WaitCallback(this.State));
                    
    else
                        
    this.WaitCallback(this.State);//如何将执行的结果返回,目前是通过SuccessCallback将结果作为参数返回,如果没有使用SuccessCallback将不能返回执行的结果
                    this.EndExecTime = DateTime.Now;
                    
    //如何将任务执行的起讫时间发给任务
                    this.CurrentThreadState = WorkerThreadState.Idle;
                    
    //this.Thread.Suspend();
                    
    //等待60s如果在此期间未接收到新的任务该线程就退出
                    if (!Auto.WaitOne(60 * 1000false))
                        
    break;
                    Manual.WaitOne();
                }

                Debug.WriteLine(
    "线程销毁");
            }

            
    public WorkerThread Change(WorkerThread wt)
            
    {
                
    this.Key = wt.Key;
                
    this.WaitCallback = wt.WaitCallback;
                
    this.State = wt.State;
                
    //this.StartExecTime = wt.StartExecTime;
                this.ErrorCallback = wt.ErrorCallback;
                
    this.SuccessCallback = wt.SuccessCallback;
                
    return this;
            }

            
    public void Start(WorkerThread wt)
            
    {
                
    this.Change(wt);
                
    this.Thread = new Thread(new ThreadStart(this.Exec));
                
    this.Thread.Priority = ThreadPriority.Lowest;
            }


            
    //public void Start(WaitCallback callback,Object state)
            
    //{
            
    //    this.WaitCallback = callback;
            
    //    this.State = state;
            
    //    if(this.Thread==null){
            
    //        this.Thread = new Thread(new ThreadStart(this.Exec));
            
    //        this.Thread.Priority = ThreadPriority.Lowest;
            
    //        this.Thread.IsBackground = true;
            
    //        this.Thread.Start();
            
    //        return;
            
    //    }
            
    //    if(this.Thread.ThreadState==ThreadState.Suspended)
            
    //    {
            
    //        this.Thread.Resume();
            
    //    }
            
    //}
        }

    }




  • 相关阅读:
    驱动表
    将索引移动到别的表空间
    log file sync, log file parallell write
    Full Hint
    4wpa_supplicant适配层 详解
    wifi 驱动 进阶11
    wifi 驱动 进阶11
    基于linux2.6.38.8内核的SDIO/wifi驱动分析
    6wpa_supplicant无线网络配置
    2系统启动后的 wifi加载 过程图解
  • 原文地址:https://www.cnblogs.com/lexus/p/1275323.html
Copyright © 2011-2022 走看看