.net内置的threadpool对于加入执行队列的任务,或是正在执行的任务无法取消,这对于我的项目来说有问题,因此要自定义一个线程池。
我的项目中具体的应用情节如下,某一个操作会非常耗时(将网址插入bdb中),如果将其加入线程池中,很可能将线程池中的资源耗尽,因此我希望我可以定义一个maxThreadNum用来控制执行此在操作最大允许同时执行的线程数,同时设定线程等级为最低ThreadPriority.Lowest,我只要这个操作慢慢执行就行了。
代码还需要重构,高手请指点下,对锁的机制还是不太清楚,
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Rhino.Commons;
namespace ThreadPool
{
public static class MyThreadPool
{
static object obj = new object();
static AutoResetEvent wait = new AutoResetEvent(false);
static MyThreadPool()
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
SetMaxThreadNum(1);
}
private static void aa(object state, bool timedOut)
{
lock (obj)
{
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List<string> removeKey = new List<string>();
List<WorkerThread> newTask=new List<WorkerThread>();
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{//kvp.Value.ThreadState == ThreadState.Unstarted ||
if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
{
//dict.Remove(kvp.Key);//cancle because of lock
WorkerThread a=queue.FirstOrDefault();
if (a != null)
{
removeKey.Add(kvp.Key);
//addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
queue.RemoveAt(0);
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
//将参数加到线程中,并改变线程的状态
//dict[a.Key].Thread.Resume();
}
else
break;
//else
//{
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
//}
}
}
removeKey.ForEach(t=>dict.Remove(t));
newTask.ForEach(t =>
{
dict.Add(t.Key, t);
t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
//未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b!=null)
{
queue.RemoveAt(0);
//Thread thd = new Thread(new ThreadStart(b.Exec));
//thd.Priority = ThreadPriority.Lowest;
//dict.Add(b.Key, thd);
//thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
//将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000, true);
}
}
//private static int _maxThreadNum = 1;
public static int MaxThreadNum
{
//get { return _maxThreadNum; }
//set { _maxThreadNum = value; }
get; set;
}
public static void SetMaxThreadNum(int num)
{
if (num < 1)
num = 1;
MaxThreadNum = num;
}
/// <summary>
/// 任务执行队列
/// </summary>
//static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
static List<WorkerThread> queue=new List<WorkerThread>();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
static Dictionary<string, WorkerThread> dict = new Dictionary<string, WorkerThread>(1);
private static object state;
public static void Aborted(string key)
{
lock (obj)
{
WorkerThread v;
if (dict.TryGetValue(key, out v))
{
v.Thread.Abort();
dict.Remove(key);
}
int index = queue.FindIndex(t => t.Key == key);
if (index>-1)
queue.RemoveAt(index);
wait.Set();
}
}
public static void QueueUserWorkItem(WaitCallback callback, object state, string key)
{
WorkerThread p = new WorkerThread()
{
Callback = callback,
State = state,
Key = key
};
//queue.Enqueue(p);
queue.Add(p);
wait.Set();
}
}
}
public class WorkerThread
{
public Thread Thread { get; set; }
public string Key { get; set; }
public WaitCallback Callback { get; set; }
public Object State { get; set; }
public void Exec()
{
while (true)
{
this.Callback(this.State);
this.Thread.Suspend();
}
}
public WorkerThread Change(WorkerThread wt)
{
this.Key = wt.Key;
this.Callback = wt.Callback;
this.State = wt.State;
return this;
}
public void Start(WorkerThread wt)
{
this.Change(wt);
this.Thread = new Thread(new ThreadStart(this.Exec));
this.Thread.Priority = ThreadPriority.Lowest;
}
//public void Start(WaitCallback callback,Object state)
//{
// this.Callback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
//}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Rhino.Commons;
namespace ThreadPool
{
public static class MyThreadPool
{
static object obj = new object();
static AutoResetEvent wait = new AutoResetEvent(false);
static MyThreadPool()
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
SetMaxThreadNum(1);
}
private static void aa(object state, bool timedOut)
{
lock (obj)
{
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List<string> removeKey = new List<string>();
List<WorkerThread> newTask=new List<WorkerThread>();
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{//kvp.Value.ThreadState == ThreadState.Unstarted ||
if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
{
//dict.Remove(kvp.Key);//cancle because of lock
WorkerThread a=queue.FirstOrDefault();
if (a != null)
{
removeKey.Add(kvp.Key);
//addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
queue.RemoveAt(0);
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
//将参数加到线程中,并改变线程的状态
//dict[a.Key].Thread.Resume();
}
else
break;
//else
//{
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
//}
}
}
removeKey.ForEach(t=>dict.Remove(t));
newTask.ForEach(t =>
{
dict.Add(t.Key, t);
t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
//未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b!=null)
{
queue.RemoveAt(0);
//Thread thd = new Thread(new ThreadStart(b.Exec));
//thd.Priority = ThreadPriority.Lowest;
//dict.Add(b.Key, thd);
//thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
//将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000, true);
}
}
//private static int _maxThreadNum = 1;
public static int MaxThreadNum
{
//get { return _maxThreadNum; }
//set { _maxThreadNum = value; }
get; set;
}
public static void SetMaxThreadNum(int num)
{
if (num < 1)
num = 1;
MaxThreadNum = num;
}
/// <summary>
/// 任务执行队列
/// </summary>
//static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
static List<WorkerThread> queue=new List<WorkerThread>();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
static Dictionary<string, WorkerThread> dict = new Dictionary<string, WorkerThread>(1);
private static object state;
public static void Aborted(string key)
{
lock (obj)
{
WorkerThread v;
if (dict.TryGetValue(key, out v))
{
v.Thread.Abort();
dict.Remove(key);
}
int index = queue.FindIndex(t => t.Key == key);
if (index>-1)
queue.RemoveAt(index);
wait.Set();
}
}
public static void QueueUserWorkItem(WaitCallback callback, object state, string key)
{
WorkerThread p = new WorkerThread()
{
Callback = callback,
State = state,
Key = key
};
//queue.Enqueue(p);
queue.Add(p);
wait.Set();
}
}
}
public class WorkerThread
{
public Thread Thread { get; set; }
public string Key { get; set; }
public WaitCallback Callback { get; set; }
public Object State { get; set; }
public void Exec()
{
while (true)
{
this.Callback(this.State);
this.Thread.Suspend();
}
}
public WorkerThread Change(WorkerThread wt)
{
this.Key = wt.Key;
this.Callback = wt.Callback;
this.State = wt.State;
return this;
}
public void Start(WorkerThread wt)
{
this.Change(wt);
this.Thread = new Thread(new ThreadStart(this.Exec));
this.Thread.Priority = ThreadPriority.Lowest;
}
//public void Start(WaitCallback callback,Object state)
//{
// this.Callback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
//}
}
测试代码
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadPoolTest
{
public class Class1
{
static ManualResetEvent wait=new ManualResetEvent(false);
static void Main(string[] args)
{
object state=new object();
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test), state, "aa");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test2), state, "bb");
System.Threading.Thread.Sleep(10000);
wait.Set();
System.Threading.Thread.Sleep(10000);
Console.WriteLine("aborted aa");
ThreadPool.MyThreadPool.Aborted("aa");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test3), state, "cc");
System.Threading.Thread.Sleep(10000);
Console.WriteLine("aborted bb");
ThreadPool.MyThreadPool.Aborted("bb");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test4), state, "dd");
Console.ReadLine();
}
private static void test4(object state)
{
Console.WriteLine("test4");
}
private static void test3(object state)
{
Console.WriteLine("test3");
}
private static void test2(object state)
{
while(true)
{
Console.WriteLine("test2");
Thread.Sleep(2000);
}
}
private static void test(object state)
{
while (true)
{
Console.WriteLine("test");
Thread.Sleep(2000);
wait.WaitOne();//只执行一次
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadPoolTest
{
public class Class1
{
static ManualResetEvent wait=new ManualResetEvent(false);
static void Main(string[] args)
{
object state=new object();
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test), state, "aa");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test2), state, "bb");
System.Threading.Thread.Sleep(10000);
wait.Set();
System.Threading.Thread.Sleep(10000);
Console.WriteLine("aborted aa");
ThreadPool.MyThreadPool.Aborted("aa");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test3), state, "cc");
System.Threading.Thread.Sleep(10000);
Console.WriteLine("aborted bb");
ThreadPool.MyThreadPool.Aborted("bb");
ThreadPool.MyThreadPool.QueueUserWorkItem(new WaitCallback(test4), state, "dd");
Console.ReadLine();
}
private static void test4(object state)
{
Console.WriteLine("test4");
}
private static void test3(object state)
{
Console.WriteLine("test3");
}
private static void test2(object state)
{
while(true)
{
Console.WriteLine("test2");
Thread.Sleep(2000);
}
}
private static void test(object state)
{
while (true)
{
Console.WriteLine("test");
Thread.Sleep(2000);
wait.WaitOne();//只执行一次
}
}
}
}