zoukankan      html  css  js  c++  java
  • RabbitMQ心跳检测与掉线重连

    1、RabbitMQListener,自定义消息监听器

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using RabbitMQ.Client.Exceptions;
    
    namespace MQ_Receive
    {
        /// <summary>
        /// RabbitMq消息监听器
        /// </summary>
        public class RabbitMqListener
        {
            private ConnectionFactory _factory;
            private IConnection _con;
            private IModel _channel;
            private EventingBasicConsumer _consumer;
    
            private readonly string _rabbitMqUri;
            private readonly string _exchangeType;
            private readonly string _exchangeName;
            private readonly string _queueName;
            private readonly string _routeKey;
            private Func<string, bool> _messageHandler;
    
            /// <summary>
            /// RabbitMQ消息监听器,若指定的队列不存在,则自动创建队列。并在消息交换机上绑定指定的消息路由规则(路由key)
            /// </summary>
            /// <param name="rabbitMqUri">连接串,如 amqp://guest:guest@localhost:5672/</param>
            /// <param name="exchangeName">消息交换机</param>
            /// <param name="exchangeType">交换机类型,如 ExchangeType.Direct</param>
            /// <param name="queueName">要监听的队列</param>
            /// <param name="routeKey">消息路由key</param>
            public RabbitMqListener(string rabbitMqUri, string exchangeName, string exchangeType, string queueName, string routeKey = "")
            {
                this._rabbitMqUri = rabbitMqUri;
                this._exchangeName = exchangeName;
                this._exchangeType = exchangeType;
                this._queueName = queueName;
                this._routeKey = routeKey;
            }
    
            /// <summary>
            /// 创建连接
            /// </summary>
            private void CreateConnection()
            {
                _factory = new ConnectionFactory
                {
                    Uri = new Uri(_rabbitMqUri),
                    RequestedHeartbeat = 20,
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true,
                    NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
                };
    
                _con = _factory.CreateConnection();
                _con.ConnectionShutdown += (_sender, _e) => ReMessageListen();//掉线重新连接并监听队列消息
            }
    
            /// <summary>
            /// 创建信道
            /// </summary>
            private void CreateChannel()
            {
                _channel = _con.CreateModel();
                _channel.ExchangeDeclare(_exchangeName, _exchangeType, true, false, null);
                _channel.QueueDeclare(_queueName, true, false, false, null); //创建一个消息队列,用来存储消息
                _channel.QueueBind(_queueName, _exchangeName, _routeKey, null);
                _channel.BasicQos(0, 3, true); //在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息
            }
    
            /// <summary>
            /// 监听队列消息
            /// </summary>
            /// <param name="messageHandler">消息处理器,当监测到队列消息时回调该处理器</param>
            /// <returns>监听状态</returns>
            public bool MessageListen(Func<string, bool> messageHandler)
            {
                try
                {
                    this.CreateConnection();
                    this.CreateChannel();
    
                    _consumer = new EventingBasicConsumer(_channel); //基于事件的消息推送方式
                    _consumer.Received += (_sender, _e) =>
                    {
                        string msg = Encoding.UTF8.GetString(_e.Body);
                        if (messageHandler != null)
                        {
                            this._messageHandler = messageHandler;
                            try
                            {
                                var isOk = this._messageHandler(msg);
                                if (isOk)
                                {
                                    _channel.BasicAck(_e.DeliveryTag, false);
                                }
                            }
                            catch (Exception ex)
                            {
                                LoggerManager.ErrorLog.Error("消息处理器执行异常:" + ex.Message, ex);
                            } 
                        }
                    };
    
                    _channel.BasicConsume(_queueName, false, _consumer); //手动确认
                    return true;
                }
                catch (Exception ex)
                {
                    LoggerManager.ErrorLog.Error("尝试监听队列消息出现错误:" + ex.Message, ex);
                }
                return false;
            }
    
            public void ReMessageListen()
            {
                try
                {
                    //清除连接及频道
                    CleanupResource();
    
                    var mres = new ManualResetEventSlim(false); //初始化状态为false
                    while (!mres.Wait(3000)) //每3秒监测一次状态,直到状态为true
                    {
                        if (MessageListen(_messageHandler))
                        {
                            mres.Set(); //设置状态为true并跳出循环
                        }
                    }
                }
                catch (Exception ex)
                {
                    LoggerManager.ErrorLog.Error("尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex);
                }
            }
    
            /// <summary>
            /// 清理资源
            /// </summary>
            private void CleanupResource()
            {
                if (_channel != null && _channel.IsOpen)
                {
                    try
                    {
                        _channel.Close();
                    }
                    catch (Exception ex)
                    {
                        LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ信道遇到错误", ex);
                    }
                    _channel = null;
                }
    
                if (_con != null && _con.IsOpen)
                {
                    try
                    {
                        _con.Close();
                    }
                    catch (Exception ex)
                    {
                        LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ连接遇到错误", ex);
                    }
                    _con = null;
                }
            }
        }
    }

    2、调用代码

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    
    namespace MQ_Receive
    {
        public partial class Form1 : Form
        {
            private delegate void ChangeText(string text);
            private readonly ChangeText _changeText;
    
            private static string rabbitHostUri = "amqp://guest:guest@localhost:5672/";
            private static string exchangeName = "order-exchange";
            private static string queueName = "order-message-test-queue";
            private static string routeKey = "order-message-routeKey";
            private static readonly object lockObj = new object();
    
            private static RabbitMQListener _listener;
            public static RabbitMQListener RabbitMQListener
            {
                get
                {
                    if (_listener == null)
                    {
                        lock (lockObj)
                        {
                            if (_listener == null)
                            {
                                _listener = new RabbitMQListener(rabbitHostUri, exchangeName, ExchangeType.Direct, queueName, routeKey);
                            }
                        }
                    }
    
                    return _listener;
                }
            }
            private Func<string, bool> MessageHandler
            {
                get {
                         return (msg) =>
                         {
                             this.label1.Invoke(_changeText, new object[1] { msg });
                             return true;
                         };
                    }
            }
    
            public Form1()
            {
                InitializeComponent();
                this.label1.Text = "";
                this._changeText = SetText;
            }
    
            private void Form1_Load(object sender, EventArgs e)
            {
                RabbitMQListener.MessageListen(MessageHandler);
            }
    
            private void SetText(string text)
            {
                this.label1.Text += text + "
    ";
            }
        }
    }
  • 相关阅读:
    PYFLINK 基础 (一):运行相关(一)PYFLINK安装与本地运行(WINDOWS10)(TABLE demo)
    linux 开机进入initramfs无法开机
    VBA删除 语法
    VBA 上传数据与查找数据 while循环 和 for循环
    odoo tree视图 当页不弹窗显示方法
    VBA 连接,提醒 rs AS new adodb.recordset 的变量未定义
    odoo 订单打印 会出现字体. ........... 虚线问题
    270. Closest Binary Search Tree Value
    277. Find the Celebrity
    724. Find Pivot Index
  • 原文地址:https://www.cnblogs.com/huangzelin/p/10908072.html
Copyright © 2011-2022 走看看