zoukankan      html  css  js  c++  java
  • Rabbitmq的可靠消息投递

    一、背景

    生产端向rabbitmq发送消息时,由于网络等原因可能导致消息发送失败。所以,rabbitmq必须有机制确保消息能准确到达mq,如果不能到达,必须反馈给生产端进行重发。

    RabbitMQ消息的可靠性投递主要两种实现:
    1、通过实现消费的重试机制,通过@Retryable来实现重试,可以设置重试次数和重试频率;
    2、生产端实现消息可靠性投递。

    两种方法消费端都可能收到重复消息,要求消费端必须实现幂等性消费。

    二、消息投递到exchange的确认模式

    rabbitmq的消息投递的过程为:
    producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer

    1、生产端发送消息到rabbitmq broker cluster后,异步接受从rabbitmq返回的ack确认信息。

    2、生产端收到返回的ack确认消息后,根据ack是true还是false,调用confirmCallback接口进行处理。

    在application.yml中开启生产端confirm模式

    spring:
      rabbitmq:
        publisher-confirms: true

    实现ConfirmCallback接口中的confirm方法,ack为true表示消息发送成功,ack为false表示消息发送失败

    @Component
    @Slf4j
    public class RabbitTemplateConfig implements ConfirmCallback{
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {        
            rabbitTemplate.setConfirmCallback(this);   // 指定 ConfirmCallback
        }
        
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
           if (!ack) {
              //try to resend msg
           } else {
              //delete msg in db
           }
       }
    }

    注意:在confirmCallback回调接口中是没有消息数据的,所以即使消息发送失败,生产端也无法在这个回调接口中直接重发,confirmCallback只能起到一个通知的作用。

    三、消息投递失败的重发机制

    如果rabbitmq返回ack失败,生产端也无法确认消息是否真的发送成功,也会造成数据丢失。最好的办法是使用rabbitmq的事务机制,但是rabbitmq的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。 

    另外一种实现思路:

    1、生产端保存每次发送的消息,如果发送成功就删除消息;

    2、如果发送失败就取出消息重新发送;

    3、如果超时还没有收到mq返回的ack,同样取出消息重新发送。

    这样就可以避免消息丢失的风险。

    以使用redis保存消息msg为例,具体实现方案为:

    1、生产端在发送消息之前,生成ack唯一确认的id;

    2、以ackId为键,消息为value,保存进redis缓存,设置超时时间;

    3、redis实现超时触发接口,当key过期时,重发消息并再次执行第2步;

    4、生产端实现ConfirmCallback接口;

    5、ConfirmCallback接口触发时,若ack为true,则直接删除此次ackId对应的msg;若ack为false,则将该ackId对应的msg取出重发;

    网上另外的实现方案:

    不通过设置redis超时时间触发超时事件进行重发,而是取出消息放入一个ackFailList中,然后开启定时任务,扫描ackFailList,重发失败的msg。

    网上的这套方案思路上和上一个方案差不多,但是是采用的额外的List来保存发送失败的消息,由于List保存在内存中,不具备持久化的功能,所以这样并不安全,如果生产端程序异常退出将导致消息丢失。可以考虑保存到数据库中。

    四、消息未投递到queue的退回模式

    生产端通过实现ReturnCallback接口,启动消息失败返回,消息路由不到队列时会触发该回调接口。

    在application.yml中开启return模式

    spring:
      rabbitmq:
        publisher-returns: true

    实现ReturnCallback接口,可以获取消息主体内容,实现消息重发

    @Component
    @Slf4j
    public class RabbitTemplateConfig implements ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {        
            rabbitTemplate.setReturnCallback(this);   //指定 ReturnCallback
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息主体 message : {}", message);
            log.info("消息主体 message : {}", replyCode);
            log.info("描述:{}", replyText);
            log.info("消息使用的交换器 exchange : {}", exchange);
            log.info("消息使用的路由键 routing : {}", routingKey);
        }
    }
  • 相关阅读:
    SQL Server, Timeout expired.all pooled connections were in use and max pool size was reached
    javascript 事件调用顺序
    Best Practices for Speeding Up Your Web Site
    C语言程序设计 使用VC6绿色版
    破解SQL Prompt 3.9的几步操作
    Master page Path (MasterPage 路径)
    几个小型数据库的比较
    CSS+DIV 完美实现垂直居中的方法
    由Response.Redirect引发的"Thread was being aborted. "异常的处理方法
    Adsutil.vbs 在脚本攻击中的妙用
  • 原文地址:https://www.cnblogs.com/alan6/p/11483419.html
Copyright © 2011-2022 走看看