代码
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
public delegate void AnonymousHandler();
public class WorkQueue : WorkQueue<AnonymousHandler> {
public WorkQueue() : this(16, -1) { }
public WorkQueue(int thread)
: this(thread, -1) {
}
public WorkQueue(int thread, int capacity) {
base.Thread = thread;
base.Capacity = capacity;
base.Process += delegate(AnonymousHandler ah) {
ah();
};
}
}
public class WorkQueue<T> : IDisposable {
public delegate void WorkQueueProcessHandler(T item);
public event WorkQueueProcessHandler Process;
private int _thread = 16;
private int _capacity = -1;
private int _work_index = 0;
private Dictionary<int, WorkInfo> _works = new Dictionary<int, WorkInfo>();
private object _works_lock = new object();
private Queue<T> _queue = new Queue<T>();
private object _queue_lock = new object();
public WorkQueue() : this(16, -1) { }
public WorkQueue(int thread)
: this(thread, -1) {
}
public WorkQueue(int thread, int capacity) {
_thread = thread;
_capacity = capacity;
}
public void Enqueue(T item) {
lock (_queue_lock) {
if (_capacity > 0 && _queue.Count >= _capacity) return;
_queue.Enqueue(item);
}
lock (_works_lock) {
foreach (WorkInfo w in _works.Values) {
if (w.IsWaiting) {
w.Set();
return;
}
}
}
if (_works.Count < _thread) {
if (_queue.Count > 0) {
int index = 0;
lock (_works_lock) {
index = _work_index++;
_works.Add(index, new WorkInfo());
}
new Thread(delegate() {
WorkInfo work = _works[index];
while (true) {
List<T> de = new List<T>();
if (_queue.Count > 0) {
lock (_queue_lock) {
if (_queue.Count > 0) {
de.Add(_queue.Dequeue());
}
}
}
if (de.Count > 0) {
try {
this.OnProcess(de[0]);
} catch {
}
}
if (_queue.Count == 0) {
work.WaitOne(TimeSpan.FromSeconds(20));
if (_queue.Count == 0) {
break;
}
}
}
lock (_works_lock) {
_works.Remove(index);
}
work.Dispose();
}).Start();
}
}
}
protected virtual void OnProcess(T item) {
if (Process != null) {
Process(item);
}
}
#region IDisposable 成员
public void Dispose() {
lock (_queue_lock) {
_queue.Clear();
}
lock (_works_lock) {
foreach (WorkInfo w in _works.Values) {
w.Dispose();
}
}
}
#endregion
public int Thread {
get { return _thread; }
set {
if (_thread != value) {
_thread = value;
}
}
}
public int Capacity {
get { return _capacity; }
set {
if (_capacity != value) {
_capacity = value;
}
}
}
public int UsedThread {
get { return _works.Count; }
}
public int Queue {
get { return _queue.Count; }
}
public string Statistics {
get {
string value = string.Format(@"线程:{0}/{1}
队列:{2}
", _works.Count, _thread, _queue.Count);
int[] keys = new int[_works.Count];
try {
_works.Keys.CopyTo(keys, 0);
} catch {
lock (_works_lock) {
keys = new int[_works.Count];
_works.Keys.CopyTo(keys, 0);
}
}
foreach (int k in keys) {
WorkInfo w = null;
if (_works.TryGetValue(k, out w)) {
value += string.Format(@"线程{0}:{1}
", k, w.IsWaiting);
}
}
return value;
}
}
class WorkInfo : IDisposable {
private ManualResetEvent _reset = new ManualResetEvent(false);
private bool _isWaiting = false;
public void WaitOne(TimeSpan timeout) {
try {
_reset.Reset();
_isWaiting = true;
_reset.WaitOne(timeout, false);
} catch { }
}
public void Set() {
try {
_isWaiting = false;
_reset.Set();
} catch { }
}
public bool IsWaiting {
get { return _isWaiting; }
}
#region IDisposable 成员
public void Dispose() {
this.Set();
_reset.Close();
}
#endregion
}
}
调用方法:
代码
//方法1
WorkQueue<xxx> writeWQ = new WorkQueue<xxx>(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理
for (int a = 0; a < 1000000; a++) {
//此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
writeWQ.Enqueue(delegate(xxx msg) {
//处理
});
}
//方法2,执行一个匿名委托
WorkQueue writeWQ = new WorkQueue(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理
for (int a = 0; a < 1000000; a++) {
//此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
writeWQ.Enqueue(delegate() {
//处理
});
}