zoukankan      html  css  js  c++  java
  • SendMessage

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Diagnostics;
    
    namespace Manager.Common
    {
        public enum EngineResult
        {
            Success,
            FaildAndSuspend,
            FaildWithoutSuspend
        }
    
        //消息传递引擎
        public class RelayEngine<T>
        {
            private Thread _RelayThread;
            private AutoResetEvent _ItemArriveEvent = new AutoResetEvent(false);
            private ManualResetEvent _ResumeEvent = new ManualResetEvent(true);
            private WaitHandle[] _WaitHandles;
            private bool _Stop = false;
    
            private LinkedList<T> _Buffer = new LinkedList<T>();
            private Func<T, bool> _RelayFunc;
            private Func<T, EngineResult> _RelayFunc2;
            private Action<Exception> _HandleException;
            public bool IsSuspend = true;
    
            public RelayEngine(Func<T, bool> relayFunc, Action<Exception> handleException, Func<T, EngineResult> relayFunc2 = null)
            {
                this._WaitHandles = new WaitHandle[] { this._ItemArriveEvent, this._ResumeEvent };
                this._RelayFunc = relayFunc;
                this._RelayFunc2 = relayFunc2;
                this._HandleException = handleException;
                this._RelayThread = new Thread(this.Run) { IsBackground = true };
                this._RelayThread.Start();
                this.IsSuspend = false;
            }
    
            public void AddItem(T item)
            {
                lock (this)
                {
                    this._Buffer.AddLast(item);
                }
                this._ItemArriveEvent.Set();
            }
    
            public void Suspend()
            {
                this.IsSuspend = true;
                this._ResumeEvent.Reset();
            }
    
            public void Resume()
            {
                this.IsSuspend = false;
                this._ResumeEvent.Set();
            }
    
            public void Stop()
            {
                this.IsSuspend = true; //线程挂起
                this._Stop = true;    //线程停止
                this._ItemArriveEvent.Set();
                this._ResumeEvent.Set();
            }
    
            private void Run()
            {
                try
                {
                    while (true)
                    {
                        if (this._Buffer.Count == 0)
                        {
                            WaitHandle.WaitAll(this._WaitHandles);
                        }
                        else
                        {
                            this._ResumeEvent.WaitOne(); //队列没有消息阻塞线程,知道收到信号
                        }
    
                        if (this._Stop) break;
    
                        if (this._Buffer.Count > 0)
                        {
                            T item = this._Buffer.First.Value; //先进先出
                            EngineResult result;
                            if (this._RelayFunc2 == null)
                            {
                                result = this._RelayFunc(item) ? EngineResult.Success : EngineResult.FaildAndSuspend;
                            }
                            else
                            {
                                result = this._RelayFunc2(item);
                            }
                            if (result == EngineResult.Success)
                            {
                                lock (this)
                                {
                                    this._Buffer.RemoveFirst();
                                }
                            }
                            else
                            {
                                if (result == EngineResult.FaildAndSuspend) this.Suspend();
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    this._HandleException(ex);
                }
            }
        }
    }
    View Code
  • 相关阅读:
    WebClient 非阻塞客户端 RestTemplate 阻塞式客户端
    微服务网关---调用其他微服务
    复习下comparable和comparator以及比较
    关于InitializingBean的用法、应用
    Scheduled(cron = "")
    windows查看进程方法(老是忘只能写了)
    vue 控件component
    vue 过滤器的使用实例
    vue基础
    日志脱敏工具
  • 原文地址:https://www.cnblogs.com/feige/p/5994896.html
Copyright © 2011-2022 走看看