zoukankan      html  css  js  c++  java
  • RabbitMQ---8、连接断开处理-断线重连

    本文转载于:https://www.itsvse.com/thread-4636-1-1.html;

    参考文献:http://www.likecs.com/show-29874.html;https://stackoverflow.com/questions/41279186/guaranteed-publishing-of-messages-on-rabbitmq-on-network-loss;

    Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,就会导致消费者或者生产者挂掉。

    下图是生产者发送消息,我手动停止了rabbitmq,然后又重新启动了rabbitmq,大概等启动成功以后,为了防止服务没有完全启动,我又等待了10秒钟

    服务完全启动成功以后,我尝试重新发送一些消息,报错,如下:

    ************** 异常文本 **************
    RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=
       在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
       在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
       在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
       在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
       在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:projectmyRabbitMQ-demo abbitMQ-PublishForm1.cs:行号 37
       在 System.Windows.Forms.Control.OnClick(EventArgs e)
       在 System.Windows.Forms.Button.OnClick(EventArgs e)
       在 System.Windows.Forms.Button.PerformClick()
       在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)
       在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)
       在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)
       在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)
       在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)




    <ignore_js_op> 

    那么如何会异常恢复呢?或者说断线重连呢?

    RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下


    1. var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
    复制代码

    具体的恢复机制如下

    1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理


    1. public void init()
    2.         {
    3.             m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
    4.             AutorecoveringConnection self = this;
    5.             EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
    6.             {
    7.                 lock (recoveryLockTarget)
    8.                 {
    9.                     if (ShouldTriggerConnectionRecovery(args))
    10.                     {
    11.                         try
    12.                         {
    13.                             self.BeginAutomaticRecovery();
    14.                         }
    15.                         catch (Exception e)
    16.                         {
    17.                             // TODO: logging
    18.                             Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
    19.                         }
    20.                     }
    21.                 }
    22.             };
    23.             lock (m_eventLock)
    24.             {
    25.                 ConnectionShutdown += recoveryListener;
    26.                 if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
    27.                 {
    28.                     m_recordedShutdownEventHandlers.Add(recoveryListener);
    29.                 }
    30.             }
    31.         }
    复制代码

    观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate

    1. protected void PerformAutomaticRecovery()
    2.         {
    3.             lock (recoveryLockTarget)
    4.             {
    5.                 RecoverConnectionDelegate();
    6.                 RecoverConnectionShutdownHandlers();
    7.                 RecoverConnectionBlockedHandlers();
    8.                 RecoverConnectionUnblockedHandlers();
    9.                 RecoverModels();
    10.                 if (m_factory.TopologyRecoveryEnabled)
    11.                 {
    12.                     RecoverEntities();
    13.                     RecoverConsumers();
    14.                 }
    15.                 RunRecoveryEventHandlers();
    16.             }
    17.         }
    复制代码



    这个方法中调用的是

    1. protected void RecoverConnectionDelegate()
    2.         {
    3.             bool recovering = true;
    4.             while (recovering)
    5.             {
    6.                 try
    7.                 {
    8.                     m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
    9.                     recovering = false;
    10.                 }
    11.                 catch (Exception)
    12.                 {
    13.                     // TODO: exponential back-off
    14.                     Thread.Sleep(m_factory.NetworkRecoveryInterval);
    15.                     // TODO: provide a way to handle these exceptions
    16.                 }
    17.             }
    18.         }
    复制代码

    可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用Thread.Sleep来等待一下,然后再次执行连接恢复。

  • 相关阅读:
    Redis数据库概述
    分布式爬虫(一)------------------分布式爬虫概述
    Spark环境搭建(五)-----------Spark生态圈概述与Hadoop对比
    错误解决记录------------rhel安装Mysql软件包依赖 mariadb组件
    rhel 7安装Mysql
    Linux虚拟机搭建本地yum源
    rhel配置网络yum源
    Spark环境搭建(四)-----------数据仓库Hive环境搭建
    冲销会计凭证:FBRP与FB08的区别
    xk01创建供应商保存的时候,提示错误“科目800001已经存在”
  • 原文地址:https://www.cnblogs.com/xiaohua19920/p/9667173.html
Copyright © 2011-2022 走看看