zoukankan      html  css  js  c++  java
  • RabbitMQ原理与相关操作(三)消息持久化

    现在聊一下RabbitMQ消息持久化:

    问题及方案描述

    1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

    这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

    2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

    这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

    相关理论描述

    RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。

    队列和交换机有一个创建时候指定的标志durabledurable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

    消息队列持久化包括3个部分:

    1、exchange持久化,在声明时指定durable => true
    2、queue持久化,在声明时指定durable => true
    3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

    注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建

    程序示例

    生产者

    class Producter
        {
            const string ExchangeName = "eric.exchange";
            const string QueueName = "eric.queue";
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                    channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
    
                    string message = "Eric is very handsome";
                    var body = Encoding.UTF8.GetBytes(message);
    
                    //将队列设置为持久化之后,还需要将消息也设为可持久化的
                    var props = channel.CreateBasicProperties();
                    props.SetPersistent(true);
    
                    channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);
    
                    Console.WriteLine("Producter Sent: {0}", message);
                    Console.ReadKey();
                }
            }
        }
    View Code

    注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")

    因为我前段时间换了笔记本,所以用户的“eric”的操作出踩了个坑,下面进行介绍下:

    如果调试运行时报错:None of the specified endpoints were reachable

    innerException是:

    {"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接。。 ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。
       在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
       在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
       --- 内部异常堆栈跟踪的结尾 ---
       在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
       在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
       在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
       在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}

    这说明我们使用的用户 不是 系统默认的 guest 而是我们自己创建的用户,但是没有足够的权限进行操作。

    解决办法:

    rabbitmqctl set_user_tags username administrator
    rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

    执行结果:

    相关其他操作见:windows下 安装 rabbitMQ 及操作常用命令

    程序运行结果:

     消费者

    class Recevice
        {
            const string ExchangeName = "eric.exchange";
            const string QueueName = "eric.queue";
            public static void Main()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                    channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
    
                    BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
                    //NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认:
                    //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
                    var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
    
                    Console.WriteLine("The received content:"+msgContent);
    
                    channel.BasicAck(msgResponse.DeliveryTag, multiple: false);
                    //使用BasicAck方式来告之是否从队列中移除该条消息
                    //需要额外注意,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。
                    Console.ReadKey();
                }
            }
        }
    View Code

    接受消息还有一种方法,就是通过基于推送事件订阅可以使用内置的 QueueingBasicConsumer 提供简化编程模型允许共享队列阻塞直到收到条消息。

    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
                    var msgResponse = consumer.Queue.Dequeue(); 
                    var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
    View Code

    程序运行结果:

    参考:http://www.cnblogs.com/shanyou/p/4067250.html

  • 相关阅读:
    (转载)Centos7 install Openstack Juno (RDO)
    (转载)vmware esxi 6.0 开启嵌套虚拟化
    Delphi XE5 android toast
    delphi中Message消息的使用方法
    delphi中Time消息的使用方法
    Delphi中Interface接口的使用方法
    SystemParametersinfo 用法
    Delphi XE5 android openurl(转)
    Delphi XE5开发Android程序使用自定义字体文件.
    获取 TUniConnection.SpecificOptions默认值和下拉框列表值
  • 原文地址:https://www.cnblogs.com/ericli-ericli/p/5938106.html
Copyright © 2011-2022 走看看