需求:
有一种任务需要定时的执行,而且非常的耗时,因此我把它放到线程池中执行,并设置线程池为1,如果该任务已经在队列中或正在执行该任务,则不要再将该任务加入线程池中了。
测试代码如下
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using ThreadPool2;
7
8 namespace ThreadPoolTest.MyThreadPool2Test
9 {
10 class Class6
11 {
12 static void Main(string[] args)
13 {
14 MyThreadPool2 pool=new MyThreadPool2(1,true,30000);
15 object obj=new object();
16 Random rnd=new Random();
17 for (var i = 0; i < 20;i++ )
18 pool.QueueUserWorkItem(call, obj, rnd.Next(1,4).ToString(), succ, err);
19 Console.ReadLine();
20 }
21
22 private static void err(object state)
23 {
24 Console.WriteLine("err");
25 }
26
27 private static void succ(object state, object result)
28 {
29 Console.WriteLine("succ");
30 }
31
32 private static object call(object state)
33 {
34 while(true)
35 {
36 Thread.Sleep(2000);
37 Console.WriteLine("exec");
38 }
39 }
40 }
41 }
42
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using ThreadPool2;
7
8 namespace ThreadPoolTest.MyThreadPool2Test
9 {
10 class Class6
11 {
12 static void Main(string[] args)
13 {
14 MyThreadPool2 pool=new MyThreadPool2(1,true,30000);
15 object obj=new object();
16 Random rnd=new Random();
17 for (var i = 0; i < 20;i++ )
18 pool.QueueUserWorkItem(call, obj, rnd.Next(1,4).ToString(), succ, err);
19 Console.ReadLine();
20 }
21
22 private static void err(object state)
23 {
24 Console.WriteLine("err");
25 }
26
27 private static void succ(object state, object result)
28 {
29 Console.WriteLine("succ");
30 }
31
32 private static object call(object state)
33 {
34 while(true)
35 {
36 Thread.Sleep(2000);
37 Console.WriteLine("exec");
38 }
39 }
40 }
41 }
42
线程池代码如下,
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using Amib.Threading.Internal;
using Rhino.Commons;
namespace ThreadPool2
{
public delegate object WaitCallback2(object state);
public delegate void SuccCallback(object state, object result);
public delegate void ErrCallback(object state);
/// <summary>
/// 此线程池的作用是将某一类特殊的任务交给此线程池执行,
/// 可以设定该线程池的最大线程数,
/// 这类线程池的优点时,占用的资源少,优先级低,
/// 适合于执行任务需要长期执行,不考虑时间因素的任务
/// 同时根据在传入线程池时的标记key,可以Aborted指定任务,
/// 若该任务正在执行或尚在执行队列中
/// </summary>
public class MyThreadPool2
{
/// <summary>
/// 任务执行队列
/// </summary>
//static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
List<WorkerThread> queue = new List<WorkerThread>();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
SynchronizedDictionary<string, WorkerThread> dict = new SynchronizedDictionary<string, WorkerThread>();
private object state;
AutoResetEvent wait = new AutoResetEvent(false);
AutoResetEvent wait2 = new AutoResetEvent(false);
private int MaxLimitedTime { get; set; }
private bool IsLimitedExecTime { get; set; }
private int IdleTimeout { get; set; }
//private static int _maxThreadNum = 1;
private int MaxThreadNum
{
//get { return _maxThreadNum; }
//set { _maxThreadNum = value; }
get;
set;
}
private MyThreadPool2()
{
//System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
//SetMaxThreadNum(2);
//SetMaxExecTime(false, 10000);
}
/// <summary>
/// 设置专用线程池的初始参数
/// </summary>
/// <param name="num">线程池的最大线程数,最小为1</param>
/// <param name="b">是否起用限制最大单个任务执行时间设定</param>
/// <param name="MaxLimitedTime">单个任务执行的最大时间</param>
public MyThreadPool2(int num, bool b, int MaxLimitedTime)
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000, true);
if (num < 1)
num = 1;
MaxThreadNum = num;
IsLimitedExecTime = b;
this.MaxLimitedTime = MaxLimitedTime;
if (IsLimitedExecTime)
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state,
this.MaxLimitedTime, true);
}
/// <summary>
/// 定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
/// </summary>
/// <param name="state"></param>
/// <param name="timedOut"></param>
private void aa(object state, bool timedOut)
{
//Console.WriteLine("执行aa()将队列中的任务加到线程中");
lock(WorkerThread.Manual){
WorkerThread.Manual.Reset();
lock (queue)
{
Console.WriteLine("queue count={0}",queue.Count);
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List<string> removeKey = new List<string>();
List<WorkerThread> newTask = new List<WorkerThread>();
List<string> tasks = new List<string>();
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{//kvp.Value.ThreadState == ThreadState.Unstarted ||
//if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
//将不活动的线程记录下来并移除
if (!kvp.Value.Thread.IsAlive)
tasks.Add(kvp.Key);
//将活动且空闲的线程赋于新的任务
if (kvp.Value.Thread.IsAlive == true && kvp.Value.CurrentThreadState == WorkerThreadState.Idle)
{
//dict.Remove(kvp.Key);//cancle because of lock
WorkerThread a = queue.FirstOrDefault();
if (a != null)
{
removeKey.Add(kvp.Key);
//addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
//a.Thread = kvp.Value.Thread;
//newTask.Add(a);
queue.RemoveAt(0);
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
//将参数加到线程中,并改变线程的状态
//dict[a.Key].Thread.Resume();
}
else
break;
//else
//{
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
//}
}
}
tasks.ForEach(t =>
{
dict.Remove(t);
Debug.WriteLine("移除销毁线程对应的dict中的键值项,key="+t);
});
removeKey.ForEach(t => dict.Remove(t));
newTask.ForEach(t =>
{
Debug.WriteLine("复用线程用于执行新任务"+t.Key);
dict.Add(t.Key, t);
//t.StartExecTime = DateTime.Now;
t.Auto.Set();
//t.CurrentThreadState = WorkerThreadState.Busy;
//t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
//未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b != null)
{
queue.RemoveAt(0);
//Thread thd = new Thread(new ThreadStart(b.Exec));
//thd.Priority = ThreadPriority.Lowest;
//dict.Add(b.Key, thd);
//thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
Debug.WriteLine("新建线程用于执行新任务"+ wt.Key);
//将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,
true);
}
WorkerThread.Manual.Set();
}
}
private void SetMaxThreadNum(int num)
{
if (num < 1)
num = 1;
MaxThreadNum = num;
}
private WorkerThread FindSpecificWorkerThreadByKey(string key)
{
WorkerThread wt;
dict.TryGetValue(key, out wt);
return wt;
}
/// <summary>
/// 设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
/// </summary>
/// <param name="b"></param>
/// <param name="time"></param>
[Obsolete("abandon")]
private void SetMaxExecTime(bool b, int time)
{
IsLimitedExecTime = b;
MaxLimitedTime = time;
if (IsLimitedExecTime)
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state,
MaxLimitedTime, true);
}
/// <summary>
/// 当任务执行超时时,注销该线程
/// </summary>
/// <param name="state"></param>
/// <param name="timedOut"></param>
private void bb(object state, bool timedOut)
{
//GC.Collect();
Console.WriteLine("执行bb(),检测是否有线程超时");
lock (WorkerThread.Manual)
{
WorkerThread.Manual.Reset();
//lock (obj)
lock (dict.SyncRoot)
{
List<string> temp = new List<string>();
foreach (var kvp in dict)
{
if (kvp.Value.CurrentThreadState==WorkerThreadState.Busy &&DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds > MaxLimitedTime)
{
Console.WriteLine("now={0}",DateTime.Now);
Console.WriteLine("before={0}",kvp.Value.StartExecTime);
temp.Add(kvp.Key);
}
}
foreach (var s in temp)
{
Debug.WriteLine("key="+s+"的任务超时,执行该任务的线程将被销毁");
_Aborted(s);
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state, MaxLimitedTime, true);
}
WorkerThread.Manual.Set();
}
}
public void Aborted(string key)
{
lock (WorkerThread.Manual)
{
WorkerThread.Manual.Reset();
_Aborted(key);
WorkerThread.Manual.Set();
}
}
private void _Aborted(string key)
{
lock (queue)
{
//任务如果还在队列中则删除该任务
int index = queue.FindIndex(t => t.Key == key);
if (index > -1)
{
queue.RemoveAt(index);
Debug.WriteLine("从任务队列中移除指定key="+key+"的任务");
}
}
//lock (dict.SyncRoot)
//{
old way now extract method FindSpecificWorkerThreadByKey to split this
WorkerThread v = FindSpecificWorkerThreadByKey(key);
//没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
//或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
//只有但指定的key的任务在执行时才删除
//if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
if (v != null && v.CurrentThreadState == WorkerThreadState.Busy)
{
dict.Remove(key);
/*
在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
用Join方法。
*/
Debug.WriteLine("销毁正在执行Key="+key+"的任务的线程");
//这里将回调的方法放线程终止之前,是防止在同一线程时,线程终止了,放在后面回调就不执行了
if (v.ErrorCallback != null)
{
v.ErrorCallback(v.State);
//最后完成任务的时间
v.EndExecTime = DateTime.Now;
}
v.Thread.Abort();
v.Thread.Join();
}
//}
//wait.Set();
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, string key, SuccCallback succ, ErrCallback err)
{
WorkerThread.Manual.Reset();
WorkerThread p = new WorkerThread()
{
WaitCallback = callback,
State = state,
Key = key,
ErrorCallback = err,
SuccessCallback = succ
};
//queue.Enqueue(p);
lock (queue)
{
if(queue.FindIndex(t=>t.Key==p.Key)==-1 && dict.Contains(p.Key)==false)
{
queue.Add(p);
wait.Set();
}
else
{
Console.WriteLine("由于队列或是正在执行的线程中拥有一个同名的key,此次加入线程的工作将被自动抛弃");
}
//Monitor.Pulse(queue);
}
WorkerThread.Manual.Set();
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, SuccCallback succ, ErrCallback err)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(), succ, err);
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, string key)
{
//WorkerThread p = new WorkerThread()
// {
// WaitCallback = callback,
// State = state,
// Key = key
// };
////queue.Enqueue(p);
//queue.Add(p);
//wait.Set();
QueueUserWorkItem(callback, state, key, null, null);
}
public void QueueUserWorkItem(WaitCallback2 callback, object state)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
}
}
public enum WorkerThreadState : byte
{
None = 0,
Busy = 1,
Idle = 2
}
public class WorkerThread
{
public AutoResetEvent Auto = new AutoResetEvent(false);
public static ManualResetEvent Manual = new ManualResetEvent(true);
public WorkerThreadState CurrentThreadState { get; set; }
public DateTime StartExecTime { get; set; }
public DateTime EndExecTime { get; set; }
public Thread Thread { get; set; }
public string Key { get; set; }
public WaitCallback2 WaitCallback { get; set; }
public SuccCallback SuccessCallback { get; set; }
public ErrCallback ErrorCallback { get; set; }
public Object State { get; set; }
public void Exec()
{
while (true)
{
this.CurrentThreadState = WorkerThreadState.Busy;
this.StartExecTime = DateTime.Now;
if (this.SuccessCallback != null)
this.SuccessCallback(this.State, this.WaitCallback(this.State));
else
this.WaitCallback(this.State);//如何将执行的结果返回,目前是通过SuccessCallback将结果作为参数返回,如果没有使用SuccessCallback将不能返回执行的结果
this.EndExecTime = DateTime.Now;
//如何将任务执行的起讫时间发给任务
this.CurrentThreadState = WorkerThreadState.Idle;
//this.Thread.Suspend();
//等待60s如果在此期间未接收到新的任务该线程就退出
if (!Auto.WaitOne(60 * 1000, false))
break;
Manual.WaitOne();
}
Debug.WriteLine("线程销毁");
}
public WorkerThread Change(WorkerThread wt)
{
this.Key = wt.Key;
this.WaitCallback = wt.WaitCallback;
this.State = wt.State;
//this.StartExecTime = wt.StartExecTime;
this.ErrorCallback = wt.ErrorCallback;
this.SuccessCallback = wt.SuccessCallback;
return this;
}
public void Start(WorkerThread wt)
{
this.Change(wt);
this.Thread = new Thread(new ThreadStart(this.Exec));
this.Thread.Priority = ThreadPriority.Lowest;
}
//public void Start(WaitCallback callback,Object state)
//{
// this.WaitCallback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
//}
}
}
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using Amib.Threading.Internal;
using Rhino.Commons;
namespace ThreadPool2
{
public delegate object WaitCallback2(object state);
public delegate void SuccCallback(object state, object result);
public delegate void ErrCallback(object state);
/// <summary>
/// 此线程池的作用是将某一类特殊的任务交给此线程池执行,
/// 可以设定该线程池的最大线程数,
/// 这类线程池的优点时,占用的资源少,优先级低,
/// 适合于执行任务需要长期执行,不考虑时间因素的任务
/// 同时根据在传入线程池时的标记key,可以Aborted指定任务,
/// 若该任务正在执行或尚在执行队列中
/// </summary>
public class MyThreadPool2
{
/// <summary>
/// 任务执行队列
/// </summary>
//static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
List<WorkerThread> queue = new List<WorkerThread>();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
SynchronizedDictionary<string, WorkerThread> dict = new SynchronizedDictionary<string, WorkerThread>();
private object state;
AutoResetEvent wait = new AutoResetEvent(false);
AutoResetEvent wait2 = new AutoResetEvent(false);
private int MaxLimitedTime { get; set; }
private bool IsLimitedExecTime { get; set; }
private int IdleTimeout { get; set; }
//private static int _maxThreadNum = 1;
private int MaxThreadNum
{
//get { return _maxThreadNum; }
//set { _maxThreadNum = value; }
get;
set;
}
private MyThreadPool2()
{
//System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
//SetMaxThreadNum(2);
//SetMaxExecTime(false, 10000);
}
/// <summary>
/// 设置专用线程池的初始参数
/// </summary>
/// <param name="num">线程池的最大线程数,最小为1</param>
/// <param name="b">是否起用限制最大单个任务执行时间设定</param>
/// <param name="MaxLimitedTime">单个任务执行的最大时间</param>
public MyThreadPool2(int num, bool b, int MaxLimitedTime)
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000, true);
if (num < 1)
num = 1;
MaxThreadNum = num;
IsLimitedExecTime = b;
this.MaxLimitedTime = MaxLimitedTime;
if (IsLimitedExecTime)
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state,
this.MaxLimitedTime, true);
}
/// <summary>
/// 定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
/// </summary>
/// <param name="state"></param>
/// <param name="timedOut"></param>
private void aa(object state, bool timedOut)
{
//Console.WriteLine("执行aa()将队列中的任务加到线程中");
lock(WorkerThread.Manual){
WorkerThread.Manual.Reset();
lock (queue)
{
Console.WriteLine("queue count={0}",queue.Count);
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List<string> removeKey = new List<string>();
List<WorkerThread> newTask = new List<WorkerThread>();
List<string> tasks = new List<string>();
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{//kvp.Value.ThreadState == ThreadState.Unstarted ||
//if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
//将不活动的线程记录下来并移除
if (!kvp.Value.Thread.IsAlive)
tasks.Add(kvp.Key);
//将活动且空闲的线程赋于新的任务
if (kvp.Value.Thread.IsAlive == true && kvp.Value.CurrentThreadState == WorkerThreadState.Idle)
{
//dict.Remove(kvp.Key);//cancle because of lock
WorkerThread a = queue.FirstOrDefault();
if (a != null)
{
removeKey.Add(kvp.Key);
//addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
//a.Thread = kvp.Value.Thread;
//newTask.Add(a);
queue.RemoveAt(0);
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
//将参数加到线程中,并改变线程的状态
//dict[a.Key].Thread.Resume();
}
else
break;
//else
//{
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
//}
}
}
tasks.ForEach(t =>
{
dict.Remove(t);
Debug.WriteLine("移除销毁线程对应的dict中的键值项,key="+t);
});
removeKey.ForEach(t => dict.Remove(t));
newTask.ForEach(t =>
{
Debug.WriteLine("复用线程用于执行新任务"+t.Key);
dict.Add(t.Key, t);
//t.StartExecTime = DateTime.Now;
t.Auto.Set();
//t.CurrentThreadState = WorkerThreadState.Busy;
//t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
//未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b != null)
{
queue.RemoveAt(0);
//Thread thd = new Thread(new ThreadStart(b.Exec));
//thd.Priority = ThreadPriority.Lowest;
//dict.Add(b.Key, thd);
//thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
Debug.WriteLine("新建线程用于执行新任务"+ wt.Key);
//将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,
true);
}
WorkerThread.Manual.Set();
}
}
private void SetMaxThreadNum(int num)
{
if (num < 1)
num = 1;
MaxThreadNum = num;
}
private WorkerThread FindSpecificWorkerThreadByKey(string key)
{
WorkerThread wt;
dict.TryGetValue(key, out wt);
return wt;
}
/// <summary>
/// 设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
/// </summary>
/// <param name="b"></param>
/// <param name="time"></param>
[Obsolete("abandon")]
private void SetMaxExecTime(bool b, int time)
{
IsLimitedExecTime = b;
MaxLimitedTime = time;
if (IsLimitedExecTime)
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state,
MaxLimitedTime, true);
}
/// <summary>
/// 当任务执行超时时,注销该线程
/// </summary>
/// <param name="state"></param>
/// <param name="timedOut"></param>
private void bb(object state, bool timedOut)
{
//GC.Collect();
Console.WriteLine("执行bb(),检测是否有线程超时");
lock (WorkerThread.Manual)
{
WorkerThread.Manual.Reset();
//lock (obj)
lock (dict.SyncRoot)
{
List<string> temp = new List<string>();
foreach (var kvp in dict)
{
if (kvp.Value.CurrentThreadState==WorkerThreadState.Busy &&DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds > MaxLimitedTime)
{
Console.WriteLine("now={0}",DateTime.Now);
Console.WriteLine("before={0}",kvp.Value.StartExecTime);
temp.Add(kvp.Key);
}
}
foreach (var s in temp)
{
Debug.WriteLine("key="+s+"的任务超时,执行该任务的线程将被销毁");
_Aborted(s);
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, new WaitOrTimerCallback(bb), state, MaxLimitedTime, true);
}
WorkerThread.Manual.Set();
}
}
public void Aborted(string key)
{
lock (WorkerThread.Manual)
{
WorkerThread.Manual.Reset();
_Aborted(key);
WorkerThread.Manual.Set();
}
}
private void _Aborted(string key)
{
lock (queue)
{
//任务如果还在队列中则删除该任务
int index = queue.FindIndex(t => t.Key == key);
if (index > -1)
{
queue.RemoveAt(index);
Debug.WriteLine("从任务队列中移除指定key="+key+"的任务");
}
}
//lock (dict.SyncRoot)
//{
old way now extract method FindSpecificWorkerThreadByKey to split this
WorkerThread v = FindSpecificWorkerThreadByKey(key);
//没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
//或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
//只有但指定的key的任务在执行时才删除
//if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
if (v != null && v.CurrentThreadState == WorkerThreadState.Busy)
{
dict.Remove(key);
/*
在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
用Join方法。
*/
Debug.WriteLine("销毁正在执行Key="+key+"的任务的线程");
//这里将回调的方法放线程终止之前,是防止在同一线程时,线程终止了,放在后面回调就不执行了
if (v.ErrorCallback != null)
{
v.ErrorCallback(v.State);
//最后完成任务的时间
v.EndExecTime = DateTime.Now;
}
v.Thread.Abort();
v.Thread.Join();
}
//}
//wait.Set();
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, string key, SuccCallback succ, ErrCallback err)
{
WorkerThread.Manual.Reset();
WorkerThread p = new WorkerThread()
{
WaitCallback = callback,
State = state,
Key = key,
ErrorCallback = err,
SuccessCallback = succ
};
//queue.Enqueue(p);
lock (queue)
{
if(queue.FindIndex(t=>t.Key==p.Key)==-1 && dict.Contains(p.Key)==false)
{
queue.Add(p);
wait.Set();
}
else
{
Console.WriteLine("由于队列或是正在执行的线程中拥有一个同名的key,此次加入线程的工作将被自动抛弃");
}
//Monitor.Pulse(queue);
}
WorkerThread.Manual.Set();
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, SuccCallback succ, ErrCallback err)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(), succ, err);
}
public void QueueUserWorkItem(WaitCallback2 callback, object state, string key)
{
//WorkerThread p = new WorkerThread()
// {
// WaitCallback = callback,
// State = state,
// Key = key
// };
////queue.Enqueue(p);
//queue.Add(p);
//wait.Set();
QueueUserWorkItem(callback, state, key, null, null);
}
public void QueueUserWorkItem(WaitCallback2 callback, object state)
{
QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
}
}
public enum WorkerThreadState : byte
{
None = 0,
Busy = 1,
Idle = 2
}
public class WorkerThread
{
public AutoResetEvent Auto = new AutoResetEvent(false);
public static ManualResetEvent Manual = new ManualResetEvent(true);
public WorkerThreadState CurrentThreadState { get; set; }
public DateTime StartExecTime { get; set; }
public DateTime EndExecTime { get; set; }
public Thread Thread { get; set; }
public string Key { get; set; }
public WaitCallback2 WaitCallback { get; set; }
public SuccCallback SuccessCallback { get; set; }
public ErrCallback ErrorCallback { get; set; }
public Object State { get; set; }
public void Exec()
{
while (true)
{
this.CurrentThreadState = WorkerThreadState.Busy;
this.StartExecTime = DateTime.Now;
if (this.SuccessCallback != null)
this.SuccessCallback(this.State, this.WaitCallback(this.State));
else
this.WaitCallback(this.State);//如何将执行的结果返回,目前是通过SuccessCallback将结果作为参数返回,如果没有使用SuccessCallback将不能返回执行的结果
this.EndExecTime = DateTime.Now;
//如何将任务执行的起讫时间发给任务
this.CurrentThreadState = WorkerThreadState.Idle;
//this.Thread.Suspend();
//等待60s如果在此期间未接收到新的任务该线程就退出
if (!Auto.WaitOne(60 * 1000, false))
break;
Manual.WaitOne();
}
Debug.WriteLine("线程销毁");
}
public WorkerThread Change(WorkerThread wt)
{
this.Key = wt.Key;
this.WaitCallback = wt.WaitCallback;
this.State = wt.State;
//this.StartExecTime = wt.StartExecTime;
this.ErrorCallback = wt.ErrorCallback;
this.SuccessCallback = wt.SuccessCallback;
return this;
}
public void Start(WorkerThread wt)
{
this.Change(wt);
this.Thread = new Thread(new ThreadStart(this.Exec));
this.Thread.Priority = ThreadPriority.Lowest;
}
//public void Start(WaitCallback callback,Object state)
//{
// this.WaitCallback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
//}
}
}