using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Manager.Common { public enum EngineResult { Success, FaildAndSuspend, FaildWithoutSuspend } //消息传递引擎 public class RelayEngine<T> { private Thread _RelayThread; private AutoResetEvent _ItemArriveEvent = new AutoResetEvent(false); private ManualResetEvent _ResumeEvent = new ManualResetEvent(true); private WaitHandle[] _WaitHandles; private bool _Stop = false; private LinkedList<T> _Buffer = new LinkedList<T>(); private Func<T, bool> _RelayFunc; private Func<T, EngineResult> _RelayFunc2; private Action<Exception> _HandleException; public bool IsSuspend = true; public RelayEngine(Func<T, bool> relayFunc, Action<Exception> handleException, Func<T, EngineResult> relayFunc2 = null) { this._WaitHandles = new WaitHandle[] { this._ItemArriveEvent, this._ResumeEvent }; this._RelayFunc = relayFunc; this._RelayFunc2 = relayFunc2; this._HandleException = handleException; this._RelayThread = new Thread(this.Run) { IsBackground = true }; this._RelayThread.Start(); this.IsSuspend = false; } public void AddItem(T item) { lock (this) { this._Buffer.AddLast(item); } this._ItemArriveEvent.Set(); } public void Suspend() { this.IsSuspend = true; this._ResumeEvent.Reset(); } public void Resume() { this.IsSuspend = false; this._ResumeEvent.Set(); } public void Stop() { this.IsSuspend = true; //线程挂起 this._Stop = true; //线程停止 this._ItemArriveEvent.Set(); this._ResumeEvent.Set(); } private void Run() { try { while (true) { if (this._Buffer.Count == 0) { WaitHandle.WaitAll(this._WaitHandles); } else { this._ResumeEvent.WaitOne(); //队列没有消息阻塞线程,知道收到信号 } if (this._Stop) break; if (this._Buffer.Count > 0) { T item = this._Buffer.First.Value; //先进先出 EngineResult result; if (this._RelayFunc2 == null) { result = this._RelayFunc(item) ? EngineResult.Success : EngineResult.FaildAndSuspend; } else { result = this._RelayFunc2(item); } if (result == EngineResult.Success) { lock (this) { this._Buffer.RemoveFirst(); } } else { if (result == EngineResult.FaildAndSuspend) this.Suspend(); } } } } catch (Exception ex) { this._HandleException(ex); } } } }