zoukankan      html  css  js  c++  java
  • RabbitMQ 发布订阅持久化及持久化方式

    RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑。RabbitMQ厂家也深知开发者的声音,稳定、可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化等。以保证RabbitMQ在重启或Crash等异常情况下,消息不会丢失。RabbitMQ提供了简单的参数配置来实现持久化操作。

    简单说明一下各种持久化方式:(描述代码采用的是RabbitMQ.Client  SDK,  C#代码)

    Queue持久化:队列是我们使用RabbitMQ进行数据传输的最多使用的方式,是进行点对点消息传递使用最多的方式。队列的持久化是通过durable=true 来实现。

    var connFactory = new ConnectionFactory();
    Conn = connFactory.CreateConnection();
    Model = Conn.CreateModel();
    Model.QueueDeclare(q, false, false, false, null);  

    其中,QueueDeclare的定义:

    /// <summary>(Spec method) Declare a queue.</summary>
            [AmqpMethodDoNotImplement(null)]
            QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
                bool autoDelete, IDictionary<string, object> arguments);  

    参数说明:queue:队列名称。durable:设置是否执行持久化。如果设置为true,即durable=true,持久化实现的重要参数

    exclusive:指示队列是否是排他性。如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。需要注意:1. 排他队列是基于连接可见的,同一连接的不同信道Channel是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

    autoDelete:是否自动删除。如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于发布订阅方式创建的临时队列。

    消息的持久化:如果将一个队列设置为持久化,那么会创建一个持久化的队列,但并不意味着队列中的消息也会持久化存储。因此如果要保证消息在RabbitMQ出现异常时不会丢失,需要设定消息的持久化。

    简要说明一下消息持久化和队列持久化的联系:

    队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。

    消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。

    那么就会出现一些矛盾的地方:

    1、因为消息必须依附于队列存在才有意义,那么如果队列设置为非持久化,而消息设置为持久化。在RabbitMQ重启之后,持久化的消息是否还存在呢?因为非持久化的队列可能并不存在。

    2、如果设置消息持久化为true,但队列设置成排他性队列,那么在RabbitMQ重启之后,消息是否仍然存在。请自行查找分析,下次分析该问题。

     1              var sf = new ConnectionFactory();
     2             using (IConnection conn = cf.CreateConnection())
     3             {
     4                 IModel ch = conn.CreateModel();
                       Model = Conn.CreateModel();
                       Model.QueueDeclare(queueName, true, false, false, null); 
    string message = "Hello C# SSL Client World"; 11 byte[] msgBytes = System.Text.Encoding.UTF8.GetBytes(message);
    //发送消息
    12 ch.BasicPublish("", queueName, null, msgBytes); 13 14 bool noAck = false; 15 BasicGetResult result = ch.BasicGet(qName, noAck); 16 byte[] body = result.Body; 17 string resultMessage = System.Text.Encoding.UTF8.GetString(body); 18 19 Assert.AreEqual(message, resultMessage); 20 }

    通过RabbitMQ SDK发送消息至MQ非常简单,通过BasicPublish即可。

     BasicPublish 的定义:

    1   /// <summary>
    2         /// (Spec method) Convenience overload of BasicPublish.
    3         /// </summary>
    4         /// <remarks>
    5         /// The publication occurs with mandatory=false
    6         /// </remarks>
    7         [AmqpMethodDoNotImplement(null)]
    8         void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);

    设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).

    设置了队列和消息持久化后,当服务重启之后,消息仍然存在。只设置队列持久化,不设置消息持久化,重启之后消息会丢失;只设置消息持久化,不设置队列持久化,在服务重启后,队列会消失,从而依附于队列的消息也会丢失。只设置消息持久化而不设置队列的持久化,毫无意义。

    Exchange持久化:

    为了实现一对多的消息发送,我们一般会采用发布订阅模式,通过一个发送端、多个订阅端来实现消息的分发。

    发布订阅模式存在一些问题:

    1、如果消费者由于网络或其他原因,与RabbitMQ的连接断开,那么RabbitMQ会自动将与其对应的队列删除,当消息程序重新连接以后,无法获取断开前未来得及消费的消息。

    2、如果RabbitMQ出现故障或Crash,那么在RabbitMQ  服务重启之后,消费端未及时消费的消息也会丢失,并且如果Exchange 不设置成持久化,那么在MQ服务重启之后,Exchange也不会存在。

    1   /// <summary>(Spec method) Declare an exchange.</summary>
    2         /// <remarks>
    3         /// The exchange is declared non-passive and non-internal.
    4         /// The "nowait" option is not exercised.
    5         /// </remarks>
    6         [AmqpMethodDoNotImplement(null)]
    7         void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete,
    8             IDictionary<string, object> arguments);

    参数说明:exchange:RabbitMQ中定义的Exchange名称,type:类型,包含fanout、topic、direct、headers,durable:持久化设置。设置成true,就可以设定exchange持久化存储,autodelete:是否自动删除。

    exchange是实现发布订阅的基础,其类型包含fanout、headers、direct、、topic。我们本次仅讨论类型为topic。

    发布订阅模式执行消息发送的流程:

    总结:

    RabbitMQ要实现发布订阅持久化,按照消息的传输流程,可以分成三类:

    Exchange 持久化:如果不设定Exchange持久化,那么在RabbitMQ由于某些异常等原因重启之后,Exchange会丢失。Exchange丢失, 会影响发送端发送消息到RabbitMQ。

    Queue持久化:发送端将消息发送至Exchange,Exchange将消息转发至关联的Queue。如果Queue不设置持久化,那么在RabbitMQ重启之后,Queue信息会丢失。导致消息发送至Exchange,但Exchange不知道需要将该消息发送至哪些具体的队列。

    Message持久化:发送端将消息发送至Exchange,Exchange将消息转发至关联的Queue,消息存储于具体的Queue中。如果RabbitMQ重启之后,由于Message未设置持久化,那么消息会在重启之后丢失。

    为了保证发布订阅的持久化,必须设置Exchange、Queue、Message的持久化,才可以保证消息最终不会丢失。

    虽然持久化会造成性能损耗,但为了生产环境的数据一致性,这是我们必须做出的选择。但我们可以通过设置消息过期时间、降低发送消息大小等其他方式来尽可能的降低MQ性能的降低。

    扩展阅读:

    1、Exchange type:topic、fanout、direct、headers的不同。

    2、消息的确认机制。

    3、将Exchange、Queue、Message都设置持久化,能保证消息100%会被成功消费吗?

    答案肯定是否,天下没有绝对的事情,尤其是复杂的MQ。

    原因简单介绍,一、如果消息的自动确认为true,那么在消息被接收以后,RabbitMQ就会删除该消息,假如消费端此时宕机,那么消息就会丢失。因此需要将消息设置为手动确认。

    二、设置手动确认会出现另一个问题,如果消息已被成功处理,但在消息确认过程中出现问题,那么在消费端重启后,消息会重新被消费。

    三、发送端为了保证消息会成功投递,一般会设定重试。如果消息发送至RabbitMQ之后,在RabbitMQ回复已成功接收消息过程中出现异常,那么发送端会重新发送该消息,从而造成消息重复发送。

    四、RabbitMQ的消息磁盘写入,如果出现问题,也会造成消息丢失。

    五、。。。。。

    下期热点问题:

    1、Exchange type的不同

    2、消息的确认与拒绝机制

    3、优先级机制

    RabbitMQ发布订阅持久化方式:Exchange、Queue、Message持久化,队列设定手动确认、AutoDelete=false。可以最大程度的保证消息不丢失。

    附RabbitMQ发布订阅持久化具体实现方式,参考代码:

     1 MQ SDK新增接口:
     2 IMQSession新增方法:
     3 /// <summary>
     4         /// 创建消息消费者
     5         /// </summary>
     6         /// <param name="topicName">主题名称</param> 
     7         /// <param name="customTopicQueueName">自定义Topic关联队列名称</param>
     8         /// <param name="isPersistence">是否持久化</param>
     9         /// <returns>消息消费者</returns>
    10         IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false);
    11 调用方式:消费端需要明确指定需要消费的发布订阅关联队列。例如配置中心热部署,每个配置中心实例都需要指定唯一的关联队列名。
    12 这样就可以和正常的MAC队列消费一样,消费指定队列消息。
    13 
    14 实现方式,四个步骤:
    15 1.创建持久化Topic(即持久化Exchange):
    16   var service = MQServiceProvider.GetDefaultMQService();
    17             var messageText = "abc";
    18             ///创建Topic
    19             using (var connection = service.CreateConnection())
    20             {
    21                 var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
    22                 var messageCreator = service.GetMessageCreator();
    23                 var message = messageCreator.CreateMessage(messageText);
    24                 message.IsPersistent = true;
    25                 var producer = session.CreateProducer();
    26                 var topic = session.DeclareTopic(topicName, true);
    27             }
    28 2.定义消费者Consumer:
    29 List<string> queueList = new List<string>() {
    30                 "guozhiqi1",
    31                 "guozhiqi2",
    32                 "guozhiqi3",
    33                 "guozhiqi4",
    34                 "guozhiqi5",
    35                 "guozhiqi6",
    36                 "guozhiqi7",
    37                 "guozhiqi8",
    38                 "guozhiqi9",
    39             };
    40             //var service = MQServiceProvider.GetDefaultMQService();
    41             //var messageText = "abc" + DateTime.Now.ToShortTimeString();
    42             //定义消费者
    43             using (var connection1 = service.CreateConnection())
    44             {
    45                 var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
    46                 foreach (var item in queueList)
    47                 {
    48                     session1.DeclareQueue(item, true);
    49                     var consumer = session1.CreateTopicConsumer(topicName, item, true);
    50                 }
    51             }
    52 3.发送消息到Topic
    53  //发送消息
    54             for (int i = 0; i <= 100; i++)
    55             {
    56                 using (var connection = service.CreateConnection())
    57                 {
    58                     var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
    59                     var messageCreator = service.GetMessageCreator();
    60                     var message = messageCreator.CreateMessage(messageText);
    61                     message.IsPersistent = true;//设置持久化
    62                    message.TimeToLive = TimeSpan.FromSeconds(30);//设置过期时间
    63                     var producer = session.CreateProducer();
    64                     var topic = session.DeclareTopic(topicName, true);
    65                     producer.Send(message, topic);
    66                 }
    67             }
    68 4.从队列接收消息
    69 Parallel.ForEach(queueList, (item) =>
    70             {
    71                 while (true)
    72                 {
    73                     //接收消息
    74                     using (var connection1 = service.CreateConnection())
    75                     {
    76                         var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
    77 
    78                         session1.DeclareQueue(item, true);
    79                         var consumer = session1.CreateTopicConsumer(topicName, item, true);
    80                         var topic = session1.DeclareTopic(topicName, true);
    81                         var receivedmessage = consumer.Receive(topic);
    82                         var textMessage = receivedmessage as ITextMessage;
    83 
    84                         Assert.AreEqual(messageText, textMessage.Body);
    85                         consumer.Acknowledge(receivedmessage);
    86                     }
    87                 }
    88 
    89             });
    View Code
  • 相关阅读:
    linux 命令
    http 协议
    关于 yaf路由
    yaf学习 从头开始
    插件概念
    框架与设计模式的区别
    一个程序员的创业(爱情)故事
    话说那年微信接口平台创业旧事
    推荐sinaapp谷歌搜索引擎,firefox自定义搜索引擎
    新的开源java反汇编程序Procyon
  • 原文地址:https://www.cnblogs.com/jiagoushi/p/8678871.html
Copyright © 2011-2022 走看看