zoukankan      html  css  js  c++  java
  • RabbitMQ---8、连接断开处理-断线重连

    本文转载于:https://www.itsvse.com/thread-4636-1-1.html;

    参考文献:http://www.likecs.com/show-29874.html;https://stackoverflow.com/questions/41279186/guaranteed-publishing-of-messages-on-rabbitmq-on-network-loss;

    Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,就会导致消费者或者生产者挂掉。

    下图是生产者发送消息,我手动停止了rabbitmq,然后又重新启动了rabbitmq,大概等启动成功以后,为了防止服务没有完全启动,我又等待了10秒钟

    服务完全启动成功以后,我尝试重新发送一些消息,报错,如下:

    ************** 异常文本 **************
    RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=
       在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
       在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
       在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
       在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
       在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:projectmyRabbitMQ-demo abbitMQ-PublishForm1.cs:行号 37
       在 System.Windows.Forms.Control.OnClick(EventArgs e)
       在 System.Windows.Forms.Button.OnClick(EventArgs e)
       在 System.Windows.Forms.Button.PerformClick()
       在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)
       在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)
       在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)
       在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)
       在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)




    <ignore_js_op> 

    那么如何会异常恢复呢?或者说断线重连呢?

    RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下


    1. var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
    复制代码

    具体的恢复机制如下

    1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理


    1. public void init()
    2.         {
    3.             m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
    4.             AutorecoveringConnection self = this;
    5.             EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
    6.             {
    7.                 lock (recoveryLockTarget)
    8.                 {
    9.                     if (ShouldTriggerConnectionRecovery(args))
    10.                     {
    11.                         try
    12.                         {
    13.                             self.BeginAutomaticRecovery();
    14.                         }
    15.                         catch (Exception e)
    16.                         {
    17.                             // TODO: logging
    18.                             Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
    19.                         }
    20.                     }
    21.                 }
    22.             };
    23.             lock (m_eventLock)
    24.             {
    25.                 ConnectionShutdown += recoveryListener;
    26.                 if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
    27.                 {
    28.                     m_recordedShutdownEventHandlers.Add(recoveryListener);
    29.                 }
    30.             }
    31.         }
    复制代码

    观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate

    1. protected void PerformAutomaticRecovery()
    2.         {
    3.             lock (recoveryLockTarget)
    4.             {
    5.                 RecoverConnectionDelegate();
    6.                 RecoverConnectionShutdownHandlers();
    7.                 RecoverConnectionBlockedHandlers();
    8.                 RecoverConnectionUnblockedHandlers();
    9.                 RecoverModels();
    10.                 if (m_factory.TopologyRecoveryEnabled)
    11.                 {
    12.                     RecoverEntities();
    13.                     RecoverConsumers();
    14.                 }
    15.                 RunRecoveryEventHandlers();
    16.             }
    17.         }
    复制代码



    这个方法中调用的是

    1. protected void RecoverConnectionDelegate()
    2.         {
    3.             bool recovering = true;
    4.             while (recovering)
    5.             {
    6.                 try
    7.                 {
    8.                     m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
    9.                     recovering = false;
    10.                 }
    11.                 catch (Exception)
    12.                 {
    13.                     // TODO: exponential back-off
    14.                     Thread.Sleep(m_factory.NetworkRecoveryInterval);
    15.                     // TODO: provide a way to handle these exceptions
    16.                 }
    17.             }
    18.         }
    复制代码

    可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用Thread.Sleep来等待一下,然后再次执行连接恢复。

  • 相关阅读:
    106. Construct Binary Tree from Inorder and Postorder Traversal
    105. Construct Binary Tree from Preorder and Inorder Traversal
    449. Serialize and Deserialize BST
    114. Flatten Binary Tree to Linked List
    199. Binary Tree Right Side View
    173. Binary Search Tree Iterator
    98. Validate Binary Search Tree
    965. Univalued Binary Tree
    589. N-ary Tree Preorder Traversal
    eclipse设置总结
  • 原文地址:https://www.cnblogs.com/xiaohua19920/p/9667173.html
Copyright © 2011-2022 走看看