本文转载于:https://www.itsvse.com/thread-4636-1-1.html;
参考文献:http://www.likecs.com/show-29874.html;https://stackoverflow.com/questions/41279186/guaranteed-publishing-of-messages-on-rabbitmq-on-network-loss;
Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,就会导致消费者或者生产者挂掉。
下图是生产者发送消息,我手动停止了rabbitmq,然后又重新启动了rabbitmq,大概等启动成功以后,为了防止服务没有完全启动,我又等待了10秒钟
服务完全启动成功以后,我尝试重新发送一些消息,报错,如下:
************** 异常文本 **************
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=
在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:projectmyRabbitMQ-demo abbitMQ-PublishForm1.cs:行号 37
在 System.Windows.Forms.Control.OnClick(EventArgs e)
在 System.Windows.Forms.Button.OnClick(EventArgs e)
在 System.Windows.Forms.Button.PerformClick()
在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)
在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)
在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)
在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)
在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)
<ignore_js_op>
那么如何会异常恢复呢?或者说断线重连呢?
RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下
- var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
具体的恢复机制如下
1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理
- public void init()
- {
- m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
- AutorecoveringConnection self = this;
- EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
- {
- lock (recoveryLockTarget)
- {
- if (ShouldTriggerConnectionRecovery(args))
- {
- try
- {
- self.BeginAutomaticRecovery();
- }
- catch (Exception e)
- {
- // TODO: logging
- Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
- }
- }
- }
- };
- lock (m_eventLock)
- {
- ConnectionShutdown += recoveryListener;
- if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
- {
- m_recordedShutdownEventHandlers.Add(recoveryListener);
- }
- }
- }
观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate
- protected void PerformAutomaticRecovery()
- {
- lock (recoveryLockTarget)
- {
- RecoverConnectionDelegate();
- RecoverConnectionShutdownHandlers();
- RecoverConnectionBlockedHandlers();
- RecoverConnectionUnblockedHandlers();
- RecoverModels();
- if (m_factory.TopologyRecoveryEnabled)
- {
- RecoverEntities();
- RecoverConsumers();
- }
- RunRecoveryEventHandlers();
- }
- }
这个方法中调用的是
- protected void RecoverConnectionDelegate()
- {
- bool recovering = true;
- while (recovering)
- {
- try
- {
- m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
- recovering = false;
- }
- catch (Exception)
- {
- // TODO: exponential back-off
- Thread.Sleep(m_factory.NetworkRecoveryInterval);
- // TODO: provide a way to handle these exceptions
- }
- }
- }
可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用Thread.Sleep来等待一下,然后再次执行连接恢复。