zoukankan      html  css  js  c++  java
  • Rabbitmq的可靠消息投递

    一、背景

    生产端向rabbitmq发送消息时,由于网络等原因可能导致消息发送失败。所以,rabbitmq必须有机制确保消息能准确到达mq,如果不能到达,必须反馈给生产端进行重发。

    RabbitMQ消息的可靠性投递主要两种实现:
    1、通过实现消费的重试机制,通过@Retryable来实现重试,可以设置重试次数和重试频率;
    2、生产端实现消息可靠性投递。

    两种方法消费端都可能收到重复消息,要求消费端必须实现幂等性消费。

    二、消息投递到exchange的确认模式

    rabbitmq的消息投递的过程为:
    producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer

    1、生产端发送消息到rabbitmq broker cluster后,异步接受从rabbitmq返回的ack确认信息。

    2、生产端收到返回的ack确认消息后,根据ack是true还是false,调用confirmCallback接口进行处理。

    在application.yml中开启生产端confirm模式

    spring:
      rabbitmq:
        publisher-confirms: true

    实现ConfirmCallback接口中的confirm方法,ack为true表示消息发送成功,ack为false表示消息发送失败

    @Component
    @Slf4j
    public class RabbitTemplateConfig implements ConfirmCallback{
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {        
            rabbitTemplate.setConfirmCallback(this);   // 指定 ConfirmCallback
        }
        
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
           if (!ack) {
              //try to resend msg
           } else {
              //delete msg in db
           }
       }
    }

    注意:在confirmCallback回调接口中是没有消息数据的,所以即使消息发送失败,生产端也无法在这个回调接口中直接重发,confirmCallback只能起到一个通知的作用。

    三、消息投递失败的重发机制

    如果rabbitmq返回ack失败,生产端也无法确认消息是否真的发送成功,也会造成数据丢失。最好的办法是使用rabbitmq的事务机制,但是rabbitmq的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。 

    另外一种实现思路:

    1、生产端保存每次发送的消息,如果发送成功就删除消息;

    2、如果发送失败就取出消息重新发送;

    3、如果超时还没有收到mq返回的ack,同样取出消息重新发送。

    这样就可以避免消息丢失的风险。

    以使用redis保存消息msg为例,具体实现方案为:

    1、生产端在发送消息之前,生成ack唯一确认的id;

    2、以ackId为键,消息为value,保存进redis缓存,设置超时时间;

    3、redis实现超时触发接口,当key过期时,重发消息并再次执行第2步;

    4、生产端实现ConfirmCallback接口;

    5、ConfirmCallback接口触发时,若ack为true,则直接删除此次ackId对应的msg;若ack为false,则将该ackId对应的msg取出重发;

    网上另外的实现方案:

    不通过设置redis超时时间触发超时事件进行重发,而是取出消息放入一个ackFailList中,然后开启定时任务,扫描ackFailList,重发失败的msg。

    网上的这套方案思路上和上一个方案差不多,但是是采用的额外的List来保存发送失败的消息,由于List保存在内存中,不具备持久化的功能,所以这样并不安全,如果生产端程序异常退出将导致消息丢失。可以考虑保存到数据库中。

    四、消息未投递到queue的退回模式

    生产端通过实现ReturnCallback接口,启动消息失败返回,消息路由不到队列时会触发该回调接口。

    在application.yml中开启return模式

    spring:
      rabbitmq:
        publisher-returns: true

    实现ReturnCallback接口,可以获取消息主体内容,实现消息重发

    @Component
    @Slf4j
    public class RabbitTemplateConfig implements ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {        
            rabbitTemplate.setReturnCallback(this);   //指定 ReturnCallback
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息主体 message : {}", message);
            log.info("消息主体 message : {}", replyCode);
            log.info("描述:{}", replyText);
            log.info("消息使用的交换器 exchange : {}", exchange);
            log.info("消息使用的路由键 routing : {}", routingKey);
        }
    }
  • 相关阅读:
    Redis未授权访问攻击过程与防范
    Redis安装
    Connection closing...Socket close. Connection closed by foreign host. Disconnected from remote host(centos6.9) at 14:59:05.
    windows远程xshell文件上传下载:
    Linux重置MySQL密码
    nginx rewrite 实现URL跳转
    Openstack 清除openstack网络与路由 (十七)
    创建 OpenStack云主机 (十五)
    OpenStack 存储服务 Cinder存储节点部署LVM (十四)
    OpenStack 存储服务 Cinder介绍和控制节点部署 (十三)
  • 原文地址:https://www.cnblogs.com/alan6/p/11483419.html
Copyright © 2011-2022 走看看