现在聊一下RabbitMQ消息持久化:
问题及方案描述
1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。
这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。
2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
这种情况可以使用RabbitMQ提供的消息队列的持久化机制。
相关理论描述
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。
队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。
消息队列持久化包括3个部分:
1、exchange持久化,在声明时指定durable => true
2、queue持久化,在声明时指定durable => true
3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
程序示例
生产者
class Producter { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); string message = "Eric is very handsome"; var body = Encoding.UTF8.GetBytes(message); //将队列设置为持久化之后,还需要将消息也设为可持久化的 var props = channel.CreateBasicProperties(); props.SetPersistent(true); channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body); Console.WriteLine("Producter Sent: {0}", message); Console.ReadKey(); } } }
注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")
因为我前段时间换了笔记本,所以用户的“eric”的操作出踩了个坑,下面进行介绍下:
如果调试运行时报错:None of the specified endpoints were reachable
innerException是:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接。。 ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) 在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) --- 内部异常堆栈跟踪的结尾 --- 在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader) 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
这说明我们使用的用户 不是 系统默认的 guest 而是我们自己创建的用户,但是没有足够的权限进行操作。
解决办法:
rabbitmqctl set_user_tags username administrator rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
执行结果:
相关其他操作见:windows下 安装 rabbitMQ 及操作常用命令
程序运行结果:
消费者
class Recevice { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); //NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认: //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); var msgContent = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("The received content:"+msgContent); channel.BasicAck(msgResponse.DeliveryTag, multiple: false); //使用BasicAck方式来告之是否从队列中移除该条消息 //需要额外注意,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。 Console.ReadKey(); } } }
接受消息还有一种方法,就是通过基于推送的事件订阅。可以使用内置的 QueueingBasicConsumer 提供简化的编程模型,允许在共享队列上阻塞,直到收到一条消息。
var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
程序运行结果: