zoukankan      html  css  js  c++  java
  • 【RabbitMQ】RabbitMQ 如何保证消息的可靠传输

    一、可靠传输

    本篇文章主要讲 RabbitMQ 如何保证消息的可靠传输,所以在讲RabbitMQ的实现之前,我们需要先来搞懂一个问题,就是什么是消息的可靠传输。

    在 RabbitMQ 中,一个消息从产生到被消费大致需要经过三个步骤,即生产者生产消息,消息投递到 RabbitMQ,RabbitMQ 再将消息推送消费者(或者是消费者拉取),最终消费者将这条消息成功消费。

    所以消息丢失也可以划分为三种情况——生产者、消息队列、消费者,如下图所示:

    所以消息的可靠传输,就是确保消息能够百分百从生产者发送到服务器,再从服务器发送到消费者。接下来我们就针对这三种情况分别讨论解决方案。

    二、生产者投递消息失败

    1. 事务机制

    使用 RabbitMQ 的事务功能,此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit

    // 开启事务
    channel.txSelect();
    try {
        // 发送消息
    } catch(Exception e) {
        channel.txRollback();
    	// 重发消息
    }
    // 提交事务
    channel.txCommit();
    

    事务机制可以基本确保生产者投递消息成功,但是这种方式有比较大的缺点,基本上 RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能了。

    2. confirm机制

    针对上述问题,如果还要确保 RabbitMQ 生产者的消息正确投递,可以开启 confirm 模式,在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ACK 消息,告诉你说这个消息 ok 了。

    如果 RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

    try {
        channel.confirmSelect(); //将信道置为 publisher confirm 模式
        // 之后正常发送消息
        channel.basicPushlish("exchange", "routingKey", null, 
                              "publisher confirm test".getBytes());
        if (!channel.waitFormConfirms()) {
            System.out.println("Send message failed");
        	// do something else...
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收之后会异步回调你一个接口通知你这个消息接收到了。

    所以一般生产者到 RabbitMQ 这块避免数据丢失,会采用 confirm 机制更多些。

    三、消息队列自身丢失

    就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

    设置持久化有两个步骤:

    • 创建 queue 的时候将其设置为持久化

      这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

    • 发送消息时将消息的 deliveryMode 设置为 2

      就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去。

    必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

    同时持久化也可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ACK,你也是可以自己重发的。

    注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存中的数据丢失。

    四、消费者宕机

    对于消费者端所产生的情况就是:消费者成功接收到消息,但是还未将消息处理完毕就宕机了。针对这种情况,可以利用 RabbitMQ 提供的 消息确认机制

    1. 消息确认机制

    为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数:

    • 当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息;
    • 当 autoAck 等于 false 时,RabbitMQ 会等待消费者显示地回复确认信号后才从内存(后者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。

    所以对于消费者可能发生宕机地情况,我们可以将 autoAck 参数置为 false,消费者就有足够的时间处理这条消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待并且持有这条消息,直到消费者显示调用 Basic.Ack 命令为止。

    当 autoAck 参数设置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息,另一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。

  • 相关阅读:
    电子商务标准体系RosettaNet与Microsoft BizTalk Accelerator for RosettaNet 无为而为
    sharepoint进行Unit Test的一个方案 无为而为
    微软VSTS的新功能:WebTest要点 无为而为
    IT部门的几种生存模式分析 无为而为
    一个人出现问题可能是道德问题,但是很多人都有相同的问题就一定不是道德问题。 无为而为
    意大利人有过关了,不过要面对东道主德国队未必会有这么容易 无为而为
    记一次阿里云OSS的STS授权访问
    得到应用程序所在路径
    反射 运用
    GetEnumerator用法
  • 原文地址:https://www.cnblogs.com/jojop/p/14101960.html
Copyright © 2011-2022 走看看