zoukankan      html  css  js  c++  java
  • Asp.NetCore轻松学-实现一个轻量级高可复用的RabbitMQ客户端

    Asp.NetCore轻松学-实现一个轻量级高可复用的RabbitMQ客户端

    前言

    本示例通过对服务订阅的封装、隐藏细节实现、统一配置、自动重连、异常处理等各个方面来打造一个简单易用的 RabbitMQ 工厂;本文适合适合有一定 RabbitMQ 使用经验的读者阅读,如果你还没有实际使用过 RabbitMQ,也没有关系,因为本文的代码都是基于直接运行的实例,通过简单的修改 RabbitMQ 即可运行。

    • 解决方案如下

    1. 创建基础连接管理帮助类

    首先,创建一个 .netcore 控制台项目,创建 Helper、Service、Utils 文件夹,分别用于存放通道管理、服务订阅、公共组件。

    1.1 接下来创建一个 MQConfig 类,用于存放 RabbitMQ 主机配置等信息
        public class MQConfig
        {
            /// <summary>
            ///  访问消息队列的用户名
            /// </summary>
            public string UserName { get; set; }
            /// <summary>
            ///  访问消息队列的密码
            /// </summary>
            public string Password { get; set; }
            /// <summary>
            ///  消息队列的主机地址
            /// </summary>
            public string HostName { get; set; }
            /// <summary>
            ///  消息队列的主机开放的端口
            /// </summary>
            public int Port { get; set; }
        }
    1.2 创建 RabbitMQ 连接管理类,用于创建连接,关闭连接
    1.3 创建一个消息体对象 MessageBody,用于解析和传递消息到业务系统中,在接下来的 MQChannel 类中会用到
      public class MessageBody
        {
            public EventingBasicConsumer Consumer { get; set; }
            public BasicDeliverEventArgs BasicDeliver { get; set; }
            /// <summary>
            ///  0成功
            /// </summary>
            public int Code { get; set; }
            public string Content { get; set; }
            public string ErrorMessage { get; set; }
            public bool Error { get; set; }
            public Exception Exception { get; set; }
        }
    1.4 创建一个通道类,用于订阅、发布消息,同时提供一个关闭通道连接的方法 Stop
     public class MQChannel
        {
            public string ExchangeTypeName { get; set; }
            public string ExchangeName { get; set; }
            public string QueueName { get; set; }
            public string RoutekeyName { get; set; }
            public IConnection Connection { get; set; }
            public EventingBasicConsumer Consumer { get; set; }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  外部订阅消费者通知委托</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-keyword">public</span> Action&lt;MessageBody&gt; OnReceivedCallback { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; }
    
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQChannel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> exchangeType, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routekey</span>)
        </span>{
            <span class="hljs-keyword">this</span>.ExchangeTypeName = exchangeType;
            <span class="hljs-keyword">this</span>.ExchangeName = exchange;
            <span class="hljs-keyword">this</span>.QueueName = queue;
            <span class="hljs-keyword">this</span>.RoutekeyName = routekey;
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  向当前队列发送消息</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="content"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Publish</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> content</span>)
        </span>{
            <span class="hljs-keyword">byte</span>[] body = MQConnection.UTF8.GetBytes(content);
            IBasicProperties prop = <span class="hljs-keyword">new</span> BasicProperties();
            prop.DeliveryMode = <span class="hljs-number">1</span>;
            Consumer.Model.BasicPublish(<span class="hljs-keyword">this</span>.ExchangeName, <span class="hljs-keyword">this</span>.RoutekeyName, <span class="hljs-literal">false</span>, prop, body);
        }
    
        <span class="hljs-function"><span class="hljs-keyword">internal</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Receive</span>(<span class="hljs-params"><span class="hljs-keyword">object</span> sender, BasicDeliverEventArgs e</span>)
        </span>{
            MessageBody body = <span class="hljs-keyword">new</span> MessageBody();
            <span class="hljs-keyword">try</span>
            {
                <span class="hljs-keyword">string</span> content = MQConnection.UTF8.GetString(e.Body);
                body.Content = content;
                body.Consumer = (EventingBasicConsumer)sender;
                body.BasicDeliver = e;
            }
            <span class="hljs-keyword">catch</span> (Exception ex)
            {
                body.ErrorMessage = <span class="hljs-string">$"订阅-出错<span class="hljs-subst">{ex.Message}</span>"</span>;
                body.Exception = ex;
                body.Error = <span class="hljs-literal">true</span>;
                body.Code = <span class="hljs-number">500</span>;
            }
            OnReceivedCallback?.Invoke(body);
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  设置消息处理完成标志</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="consumer"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="deliveryTag"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="multiple"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">SetBasicAck</span>(<span class="hljs-params">EventingBasicConsumer consumer, <span class="hljs-keyword">ulong</span> deliveryTag, <span class="hljs-keyword">bool</span> multiple</span>)
        </span>{
            consumer.Model.BasicAck(deliveryTag, multiple);
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  关闭消息队列的连接</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)
        </span>{
            <span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.Connection != <span class="hljs-literal">null</span> &amp;&amp; <span class="hljs-keyword">this</span>.Connection.IsOpen)
            {
                <span class="hljs-keyword">this</span>.Connection.Close();
                <span class="hljs-keyword">this</span>.Connection.Dispose();
            }
        }
    }</code></pre>
    

    1.5 在上面的 MQChannel 类中

    首先是在构造函数内对当前通道的属性进行设置,其次提供了 Publish 和 OnReceivedCallback 的委托,当通道接收到消息的时候,会进入方法 Receive 中,在 Receive 中,经过封装成 MessageBody 对象,并调用委托 OnReceivedCallback ,将,解析好的消息传递到外边订阅者的业务中。最终在 MQChannel 中还提供了消息确认的操作方法 SetBasicAck,供业务系统手动调用。

    1.6 接着再创建一个 RabbitMQ 通道管理类,用于创建通道,代码非常简单,只有一个公共方法 CreateReceiveChannel,传入相关参数,创建一个 MQChannel 对象
        public class MQChannelManager
        {
            public MQConnection MQConn { get; set; }
    
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQChannelManager</span>(<span class="hljs-params">MQConnection conn</span>)
        </span>{
            <span class="hljs-keyword">this</span>.MQConn = conn;
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  创建消息通道</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="cfg"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> MQChannel <span class="hljs-title">CreateReceiveChannel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> exchangeType, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routekey</span>)
        </span>{
            IModel model = <span class="hljs-keyword">this</span>.CreateModel(exchangeType, exchange, queue, routekey);
            model.BasicQos(<span class="hljs-number">0</span>, <span class="hljs-number">1</span>, <span class="hljs-literal">false</span>);
            EventingBasicConsumer consumer = <span class="hljs-keyword">this</span>.CreateConsumer(model, queue);
            MQChannel channel = <span class="hljs-keyword">new</span> MQChannel(exchangeType, exchange, queue, routekey)
            {
                Connection = <span class="hljs-keyword">this</span>.MQConn.Connection,
                Consumer = consumer
            };
            consumer.Received += channel.Receive;
            <span class="hljs-keyword">return</span> channel;
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  创建一个通道,包含交换机/队列/路由,并建立绑定关系</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="type"&gt;</span>交换机类型<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="exchange"&gt;</span>交换机名称<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="queue"&gt;</span>队列名称<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="routeKey"&gt;</span>路由名称<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;returns&gt;</span><span class="hljs-doctag">&lt;/returns&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">private</span> IModel <span class="hljs-title">CreateModel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> type, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routeKey, IDictionary&lt;<span class="hljs-keyword">string</span>, <span class="hljs-keyword">object</span>&gt; arguments = <span class="hljs-literal">null</span></span>)
        </span>{
            type = <span class="hljs-keyword">string</span>.IsNullOrEmpty(type) ? <span class="hljs-string">"default"</span> : type;
            IModel model = <span class="hljs-keyword">this</span>.MQConn.Connection.CreateModel();
            model.BasicQos(<span class="hljs-number">0</span>, <span class="hljs-number">1</span>, <span class="hljs-literal">false</span>);
            model.QueueDeclare(queue, <span class="hljs-literal">true</span>, <span class="hljs-literal">false</span>, <span class="hljs-literal">false</span>, arguments);
            model.QueueBind(queue, exchange, routeKey);
            <span class="hljs-keyword">return</span> model;
        }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  接收消息到队列中</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="model"&gt;</span>消息通道<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="queue"&gt;</span>队列名称<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="callback"&gt;</span>订阅消息的回调事件<span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;returns&gt;</span><span class="hljs-doctag">&lt;/returns&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">private</span> EventingBasicConsumer <span class="hljs-title">CreateConsumer</span>(<span class="hljs-params">IModel model, <span class="hljs-keyword">string</span> queue</span>)
        </span>{
            EventingBasicConsumer consumer = <span class="hljs-keyword">new</span> EventingBasicConsumer(model);
            model.BasicConsume(queue, <span class="hljs-literal">false</span>, consumer);
    
            <span class="hljs-keyword">return</span> consumer;
        }
    }</code></pre>
    

    1.7 通道管理类的构造方法

     public MQChannelManager(MQConnection conn)
            {
                this.MQConn = conn;
            }

    1.8 需要传入一个 MQConnection 对象,仅是一个简单的连接类,代码如下

        public class MQConnection
        {
            private string vhost = string.Empty;
            private IConnection connection = null;
            private MQConfig config = null;
    
            /// <summary>
            ///  构造无 utf8 标记的编码转换器
            /// </summary>
            public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false);
    
            public MQConnection(MQConfig config, string vhost)
            {
                this.config = config;
                this.vhost = vhost;
            }
    
            public IConnection Connection
            {
                get
                {
                    if (connection == null)
                    {
                        ConnectionFactory factory = new ConnectionFactory
                        {
                            AutomaticRecoveryEnabled = true,
                            UserName = this.config.UserName,
                            Password = this.config.Password,
                            HostName = this.config.HostName,
                            VirtualHost = this.vhost,
                            Port = this.config.Port
                        };
                        connection = factory.CreateConnection();
                    }
    
                    return connection;
                }
            }
        }

    1.9 在上面的代码中,还初始化了一个静态对象 UTF8Encoding ,使用无 utf8 标记的编码转换器来解析消息

    2. 定义和实现服务契约

    设想一下,有这样的一个业务场景,通道管理和服务管理都是相同的操作,如果这些基础操作都在一个地方定义,且有一个默认的实现,那么后来者就不需要去关注这些技术细节,直接继承基础类后,传入相应的消息配置即可完成
    消息订阅和发布操作。

    2.1 有了想法,接下来就先定义契约接口 IService,此接口包含创建通道、开启/停止订阅,一个服务可能承载多个通道,所以还需要包含通道列表
    public interface IService
        {
            /// <summary>
            ///  创建通道
            /// </summary>
            /// <param name="queue">队列名称</param>
            /// <param name="routeKey">路由名称</param>
            /// <param name="exchangeType">交换机类型</param>
            /// <returns></returns>
            MQChannel CreateChannel(string queue, string routeKey, string exchangeType);
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  开启订阅</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">Start</span>(<span class="hljs-params"></span>)</span>;
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  停止订阅</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)</span>;
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  通道列表</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        List&lt;MQChannel&gt; Channels { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  消息队列中定义的虚拟机</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-keyword">string</span> vHost { <span class="hljs-keyword">get</span>; }
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  消息队列中定义的交换机</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-keyword">string</span> Exchange { <span class="hljs-keyword">get</span>; }
    }</code></pre>
    

    2.2 接下来创建一个抽象类来实现该接口,将实现细节进行封装,方便后面的业务服务继承调用

      public abstract class MQServiceBase : IService
        {
            internal bool started = false;
            internal MQServiceBase(MQConfig config)
            {
                this.Config = config;
            }
    
            public MQChannel CreateChannel(string queue, string routeKey, string exchangeType)
            {
                MQConnection conn = new MQConnection(this.Config, this.vHost);
                MQChannelManager cm = new MQChannelManager(conn);
                MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey);
                return channel;
            }
    
            /// <summary>
            ///  启动订阅
            /// </summary>
            public void Start()
            {
                if (started)
                {
                    return;
                }
    
                MQConnection conn = new MQConnection(this.Config, this.vHost);
                MQChannelManager manager = new MQChannelManager(conn);
                foreach (var item in this.Queues)
                {
                    MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                    channel.OnReceivedCallback = item.OnReceived;
                    this.Channels.Add(channel);
                }
                started = true;
            }
    
            /// <summary>
            ///  停止订阅
            /// </summary>
            public void Stop()
            {
                foreach (var c in this.Channels)
                {
                    c.Stop();
                }
                this.Channels.Clear();
                started = false;
            }
    
            /// <summary>
            /// 接收消息
            /// </summary>
            /// <param name="message"></param>
            public abstract void OnReceived(MessageBody message);
    
            public List<MQChannel> Channels { get; set; } = new List<MQChannel>();
    
            /// <summary>
            ///  消息队列配置
            /// </summary>
            public MQConfig Config { get; set; }
    
            /// <summary>
            ///  消息队列中定义的虚拟机
            /// </summary>
            public abstract string vHost { get; }
    
            /// <summary>
            ///  消息队列中定义的交换机
            /// </summary>
            public abstract string Exchange { get; }
    
            /// <summary>
            ///  定义的队列列表
            /// </summary>
            public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
        }

    上面的抽象类,原封不动的实现接口契约,代码非常简单,在 Start 方法中,创建通道和启动消息订阅;同时,将通道加入属性 Channels 中,方便后面的自检服务使用;在 Start 方法中

     /// <summary>
            ///  启动订阅
            /// </summary>
            public void Start()
            {
                if (started)
                {
                    return;
                }
    
            MQConnection conn = <span class="hljs-keyword">new</span> MQConnection(<span class="hljs-keyword">this</span>.Config, <span class="hljs-keyword">this</span>.vHost);
            MQChannelManager manager = <span class="hljs-keyword">new</span> MQChannelManager(conn);
            <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, <span class="hljs-keyword">this</span>.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                <span class="hljs-keyword">this</span>.Channels.Add(channel);
            }
            started = <span class="hljs-literal">true</span>;
        }</code></pre>
    

    使用 MQChannelManager 创建了一个通道,并将通道的回调委托 OnReceivedCallback 设置为 item.OnReceived 方法,该方法将有子类实现;在将当前订阅服务通道创建完成后,标记服务状态 started 为 true,防止重复启动;同时,在该抽象类中,不实现契约的 OnReceived(MessageBody message);强制基础业务服务类去自我实现,因为各种业务的特殊性,这块对消息的处理不能再基础服务中完成

    接下来要介绍的是服务监控管理类,该类内部定义一个简单的定时器功能,不间断的对 RabbitMQ 的通讯进行侦听,一旦发现有断开的连接,就自动创建一个新的通道,并移除旧的通道;同时,提供 Start/Stop 两个方法,以供程序 启动/停止 的时候对

    2.3 RabbitMQ 的连接和通道进行清理;代码如下
    public class MQServcieManager
        {
            public int Timer_tick { get; set; } = 10 * 1000;
            private Timer timer = null;
    
        <span class="hljs-keyword">public</span> Action&lt;MessageLevel, <span class="hljs-keyword">string</span>, Exception&gt; OnAction = <span class="hljs-literal">null</span>;
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQServcieManager</span>(<span class="hljs-params"></span>)
        </span>{
            timer = <span class="hljs-keyword">new</span> Timer(OnInterval, <span class="hljs-string">""</span>, Timer_tick, Timer_tick);
        }
    
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span>  自检,配合 RabbitMQ 内部自动重连机制</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="sender"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnInterval</span>(<span class="hljs-params"><span class="hljs-keyword">object</span> sender</span>)
        </span>{
            <span class="hljs-keyword">int</span> error = <span class="hljs-number">0</span>, reconnect = <span class="hljs-number">0</span>;
            OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{DateTime.Now}</span> 正在执行自检"</span>, <span class="hljs-literal">null</span>);
            <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
            {
                <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; item.Channels.Count; i++)
                {
                    <span class="hljs-keyword">var</span> c = item.Channels[i];
                    <span class="hljs-keyword">if</span> (c.Connection == <span class="hljs-literal">null</span> || !c.Connection.IsOpen)
                    {
                        error++;
                        OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{c.ExchangeName}</span> <span class="hljs-subst">{c.QueueName}</span> <span class="hljs-subst">{c.RoutekeyName}</span> 重新创建订阅"</span>, <span class="hljs-literal">null</span>);
                        <span class="hljs-keyword">try</span>
                        {
                            c.Stop();
                            <span class="hljs-keyword">var</span> channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName);
                            item.Channels.Remove(c);
                            item.Channels.Add(channel);
    
                            OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{c.ExchangeName}</span> <span class="hljs-subst">{c.QueueName}</span> <span class="hljs-subst">{c.RoutekeyName}</span> 重新创建完成"</span>, <span class="hljs-literal">null</span>);
                            reconnect++;
                        }
                        <span class="hljs-keyword">catch</span> (Exception ex)
                        {
                            OnAction?.Invoke(MessageLevel.Information, ex.Message, ex);
                        }
                    }
                }
            }
            OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{DateTime.Now}</span> 自检完成,错误数:<span class="hljs-subst">{error}</span>,重连成功数:<span class="hljs-subst">{reconnect}</span>"</span>, <span class="hljs-literal">null</span>);
        }
    
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Start</span>(<span class="hljs-params"></span>)
        </span>{
            <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
            {
                <span class="hljs-keyword">try</span>
                {
                    item.Start();
                }
                <span class="hljs-keyword">catch</span> (Exception e)
                {
                    OnAction?.Invoke(MessageLevel.Error, <span class="hljs-string">$"启动服务出错 | <span class="hljs-subst">{e.Message}</span>"</span>, e);
                }
            }
        }
    
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)
        </span>{
            <span class="hljs-keyword">try</span>
            {
                <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
                {
                    item.Stop();
                }
                Services.Clear();
                timer.Dispose();
            }
            <span class="hljs-keyword">catch</span> (Exception e)
            {
                OnAction?.Invoke(MessageLevel.Error, <span class="hljs-string">$"停止服务出错 | <span class="hljs-subst">{e.Message}</span>"</span>, e);
            }
        }
    
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">AddService</span>(<span class="hljs-params">IService service</span>)
        </span>{
            Services.Add(service);
        }
        <span class="hljs-keyword">public</span> List&lt;IService&gt; Services { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; } = <span class="hljs-keyword">new</span> List&lt;IService&gt;();
    }</code></pre>
    

    代码比较简单,就不在一一介绍,为了将异常等内部信息传递到外边,方便使用第三方组件进行日志记录等需求,MQServcieManager 还使用了 MessageLevel 这个定义,方便业务根据不同的消息级别对消息进行处理

        public enum MessageLevel
        {
            Trace = 0,
            Debug = 1,
            Information = 2,
            Warning = 3,
            Error = 4,
            Critical = 5,
            None = 6
        }

    3. 开始使用

    终于来到了这一步,我们将要开始使用这个基础服务;首先,创建一个 DemoService 继承自 MQServiceBase ;同时,

    3.1 实现 MQServiceBase 的抽象方法 OnReceived(MessageBody message)
     public class DemoService : MQServiceBase
        {
            public Action<MessageLevel, string, Exception> OnAction = null;
            public DemoService(MQConfig config) : base(config)
            {
                base.Queues.Add(new QueueInfo()
                {
                    ExchangeType = ExchangeType.Direct,
                    Queue = "login-message",
                    RouterKey = "pk",
                    OnReceived = this.OnReceived
                });
            }
    
        <span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">string</span> vHost { <span class="hljs-keyword">get</span> { <span class="hljs-keyword">return</span> <span class="hljs-string">"gpush"</span>; } }
        <span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">string</span> Exchange { <span class="hljs-keyword">get</span> { <span class="hljs-keyword">return</span> <span class="hljs-string">"user"</span>; } }
    
    
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> 接收消息</span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
        <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="message"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnReceived</span>(<span class="hljs-params">MessageBody message</span>)
        </span>{
            <span class="hljs-keyword">try</span>
            {
                Console.WriteLine(message.Content);
            }
            <span class="hljs-keyword">catch</span> (Exception ex)
            {
                OnAction?.Invoke(MessageLevel.Error, ex.Message, ex);
            }
            message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, <span class="hljs-literal">true</span>);
    
        }
    }</code></pre>
    

    以上的代码非常简单,几乎不需要业务开发者做更多的其它工作,开发者只需要在构造方法内部传入一个 QueueInfo 对象,如果有多个,可一并传入

        public partial class QueueInfo
        {
            /// <summary>
            ///  队列名称
            /// </summary>
            public string Queue { get; set; }
            /// <summary>
            ///  路由名称
            /// </summary>
            public string RouterKey { get; set; }
            /// <summary>
            ///  交换机类型
            /// </summary>
            public string ExchangeType { get; set; }
            /// <summary>
            ///  接受消息委托
            /// </summary>
            public Action<MessageBody> OnReceived { get; set; }
            /// <summary>
            ///  输出信息到客户端
            /// </summary>
            public Action<MQChannel, MessageLevel, string> OnAction { get; set; }
        }

    并设置 vHost 和 Exchange 的值,然后剩下的就是在 OnReceived(MessageBody message) 方法中专心的处理自己的业务了;在这里,我们仅输出接收到的消息,并设置 ack 为已成功处理。

    4. 测试代码

    4.1 在 Program,我们执行该测试
       class Program
        {
            static void Main(string[] args)
            {
                Test();
            }
    
        <span class="hljs-function"><span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Test</span><span class="hljs-params">()</span>
        </span>{
            MQConfig config = <span class="hljs-keyword">new</span> MQConfig()
            {
                HostName = <span class="hljs-string">"127.0.0.1"</span>,
                Password = <span class="hljs-string">"123456"</span>,
                Port = <span class="hljs-number">5672</span>,
                UserName = <span class="hljs-string">"dotnet"</span>
            };
    
            MQServcieManager manager = <span class="hljs-keyword">new</span> MQServcieManager();
            manager.AddService(<span class="hljs-keyword">new</span> DemoService(config));
            manager.OnAction = OnActionOutput;
            manager.Start();
    
            Console.WriteLine(<span class="hljs-string">"服务已启动"</span>);
            Console.ReadKey();
    
            manager.Stop();
            Console.WriteLine(<span class="hljs-string">"服务已停止,按任意键退出..."</span>);
            Console.ReadKey();
        }
    
        <span class="hljs-function"><span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnActionOutput</span><span class="hljs-params">(MessageLevel level, <span class="hljs-built_in">string</span> message, Exception ex)</span>
        </span>{
            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine(<span class="hljs-string">"{0} | {1} | {2}"</span>, level, message, ex?.StackTrace);
            Console.ForegroundColor = ConsoleColor.Gray;
        }
    }</code></pre>
    

    4.2 利用 MQServcieManager 对象,完成了对所有消息订阅者的管理和监控,

    4.3 首先我们到 RabbitMQ 的 web 控制台发布一条消息到队列 login-message 中

    4.3 然后查看输出结果

    消息已经接收并处理,为了查看监控效果,我还手动将网络进行中断,然后监控服务检测到无法连接,尝试重建通道,并将消息输出

    • 图中步骤说明
    • 0:服务启动
    • 1:自检启动
    • 2:服务报错,尝试重建,重建失败,继续监测
    • 3:RabbitMQ 内部监控自动重连,监控程序检测到已恢复,收到消息并处理
    • 4:后续监控服务继续进行监控

    结语

    在文章中,我们建立了 RabbitMQ 的通道管理、基础服务管理、契约实现等操作,让业务开发人员通过简单的继承实现去快速的处理业务系统的逻辑,后续如果有增加消费者的情况下,只需要通过 MQServcieManager.AddService 进行简单的调用操作即可,无需对底层技术细节进行过多的改动。

    源码下载:
    https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest

    微信公众号:DotNet程序园
    欢迎关注收取阅读最新文章
    • 您随手点赞是我不断书写的动力,如有错误,欢迎指正
    • 出处:http://www.cnblogs.com/viter/
    • 推荐一个快速开发脚手架,基于 .netcore+pgsql,GitHub地址: https://github.com/lianggx/mystaging
    • 本文版权归作者和博客园共有,欢迎个人转载,必须保留此段声明;商业转载请联系授权,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    • 欢迎大家关注我的微信公众号,一起学习一起进步
    2
    0
    « 上一篇:Asp.NetCore轻松学-配置服务 apollo 部署实践
    » 下一篇:Asp.NetCore依赖注入和管道方式的异常处理及日志记录
    	</div>
    	<div class="postDesc">posted @ <span id="post-date">2018-11-23 10:26</span> <a href="https://www.cnblogs.com/viter/">Ron.Liang</a> 阅读(<span id="post_view_count">956</span>) 评论(<span id="post_comment_count">8</span>)  <a href="https://i.cnblogs.com/EditPosts.aspx?postid=10003185" rel="nofollow">编辑</a> <a href="#" onclick="AddToWz(10003185);return false;">收藏</a></div>
    </div>
    <script src="//common.cnblogs.com/highlight/9.12.0/highlight.min.js"></script><script>markdown_highlight();</script><script type="text/javascript">var allowComments=true,cb_blogId=30433,cb_entryId=10003185,cb_blogApp=currentBlogApp,cb_blogUserGuid='e9823d0b-63cf-dd11-9e4d-001cf0cd104b',cb_entryCreatedDate='2018/11/23 10:26:00';loadViewCount(cb_entryId);var cb_postType=1;var isMarkdown=true;</script>
    
  • 相关阅读:
    排序算法之归并排序(Merge Sort)
    排序算法之选择排序
    [BUUCTF]REVERSE——firmware
    [BUUCTF]REVERSE——[WUSTCTF2020]Cr0ssfun
    [BUUCTF]PWN——hitcontraining_magicheap
    [BUUCTF]PWN——ciscn_2019_n_3
    [BUUCTF]PWN——[V&N2020 公开赛]easyTHeap
    [BUUCTF]PWN——babyfengshui_33c3_2016
    [BUUCTF]PWN——babyheap_0ctf_2017
    CTFHub[PWN技能树]——栈溢出
  • 原文地址:https://www.cnblogs.com/owenzh/p/11213864.html
Copyright © 2011-2022 走看看