zoukankan      html  css  js  c++  java
  • rabbitmq学习(八) —— 可靠机制上的“可靠”

    接着上一篇,既然已经有了手动ack、confirm机制、return机制,还不够吗?

    以下博文转自https://www.jianshu.com/p/6579e48d18aehttps://my.oschina.net/u/3523423/blog/1620885

    本以为这样的实现基本是没有问题的。但是前段时间做了一个性能压力测试,但是发现在使用rabbitTemplate时,会有一定的丢数据问题。

    当时的场景是用30个线程,无间隔的向rabbitmq发送数据,但是当运行一段时间后发现,会出现一些connection closed错误,rabbitTemplate虽然进行了自动重连,但是在重连的过程中,丢失了一部分数据。当时发送了300万条数据,丢失在2000条左右。
    这种丢失率,对于一些对一致性要求很高的应用(比如扣款,转账)来说,是不可接受的。

    在google了很久之后,在stackoverflow上找到rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加channel数:

    connectionFactory.setChannelCacheSize(100);
    或者yml中配置
    cache:
      channel:
        size: 100

    修改之后,确实不再出现connection closed这种错误了,在发送了3000万条数据后,一条都没有丢失。
    似乎问题已经完美的解决了,但是我又想到一个问题:当我们的网络在发生抖动时,这种方式还是不是安全的?
    换句话说,如果我强制切断客户端和rabbitmq服务端的连接,数据还会丢失吗?

    如上图,生产者把消息发送到 RabbitMQ,然后 RabbitMQ 再把消息投递到消费者。

    生产者和 RabbitMQ,以及 RabbitMQ 和消费者都是通过 TCP 连接,但是他们之间是通过信道(Channel)传递数据的。多个线程共享一个连接,但是每个线程拥有独自的信道。

    消费者 ack

    • 问题:怎么保证 RabbitMQ 投递的消息被成功投递到了消费者?

      RabbitMQ 投递的消息,刚投递一半,产生了网络抖动,就有可能到不了消费者。

    • 解决办法:

      RabbitMQ 对消费者说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到,我还会重新投递”

    在 RabbitMQ 中,有两种 acknowledgement 模式。

    自动 acknowledgement 模式

    这也称作发后即忘模式

    在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,这条消息就会丢失。

    会有丢失消息问题。

    手动 acknowledgement 模式

    在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,导致这条消息没有被 acked,RabbitMQ 会自动把当前消息重新入队,再次投递。

    会有重复投递消息的问题,所以消费者得准备好处理重复消息的问题,就是所谓的:幂等性。

    注意

    如果开启了消费者手动 ack 模式,但是又没有调用手动确认方法(比如:channel.basicAck),那问题就大了,RabbitMQ 会在当前 channel 上一直阻塞,等待消费者 ack。

    生产者 confirms

    • 问题:怎么保证生产者发送的消息被 RabbitMQ 成功接收?

      生产者发送的消息,刚发送一半,产生了网络抖动,就有可能到不了 RabbitMQ。

    • 解决办法:

      生产者对 RabbitMQ 说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到”

    • 即生产者投递到交换机和交换机匹配不到队列都会导致消息丢失,confirm和return机制并不能恢复消息

    下面是参考实现

    思路:使用redis将所有消息缓存,如果confirm回调时ack为true并且没有return回调,说明消息投递成功,可以从redis中删除该消息

    至于消费者确认可以交由服务器去管理,rabbitmq服务器未收到消费者ack时消息会重新入队

    需要注意的是可能会有重复数据(比如消费者处理了数据确认时宕机了,这时服务器又会重新投递一次),因此消费者接口必须保证幂等性

    yml配置同上一篇

    自定义消息元数据

    /**
     * 自定义消息元数据
     */
    @NoArgsConstructor
    @Data
    public class RabbitMetaMessage implements Serializable{
        /**
         * 是否是 returnCallback
         */
        private boolean returnCallback;
        /**
         * 承载原始消息数据数据
         */
        private Object payload;
        public RabbitMetaMessage(Object payload) {
            this.payload = payload;
        }
    }
    • returnCallback 标记当前消息是否触发了 returnCallback(后面会解释)
    • payload 保存原始消息数据

    生产者

    先把消息存储到 redis(也可以使用其他缓存框架,redis可以持久化能保证即使服务器宕机也能恢复消息),再发送到 rabbitmq

    @RestController
    public class ProducerController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Autowired
        private RedisTemplate redisTemplate;
        @Autowired
        private DefaultKeyGenerator keyGenerator;
    
        @GetMapping("/sendMessage")
        public Object sendMessage() {
            new Thread(() -> {
                HashOperations hashOperations = redisTemplate.opsForHash();
                for (int i = 0; i < 1; i++) {
                    String id = keyGenerator.generateKey() + "";
                    String value = "message " + i;
                    RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value);
                    // 先把消息存储到 redis
                    hashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage);
                    Console.log("send message = {}", value);
                    // 再发送到 rabbitmq
                    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> {
                        message.getMessageProperties().setMessageId(id);
                        return message;
                    }, new CorrelationData(id));
                }
            }).start();
            return "ok";
        }
    }

    配置 RabbitTemplate

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调
        // 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
        rabbitTemplate.setMandatory(true);
        // 设置 ConfirmCallback 回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);
            // 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
            // 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)
            if (ack) {
                String messageId = correlationData.getId();
                RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
                Console.log("rabbitMetaMessage = {}", rabbitMetaMessage);
                if (!rabbitMetaMessage.isReturnCallback()) {
                    // 到这一步才能完全保证消息成功发送到了 rabbitmq
                    // 删除 redis 里面的消息
                    redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId);
                }
            }
        });
        // 设置 ReturnCallback 回调
        // 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                                          exchange, routingKey) -> {
            Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);
            // 从 redis 取出消息,设置 returnCallback 设置为 true
            String messageId = message.getMessageProperties().getMessageId();
            RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
            rabbitMetaMessage.setReturnCallback(true);
            redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage);
        });
        return rabbitTemplate;
    }

    ReturnCallback 回调

    必须 rabbitTemplate.setMandatory(true),不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调。而且 ReturnCallback 比 ConfirmCallback 先回调。

    如何模拟 发送到交换器成功,但是没有匹配的队列,先把项目启动,然后再把队列解绑,再发送消息,就会触发 ReturnCallback 回调,而且发现消息也丢失了,没有到任何队列。

    这样就解绑了。

    运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

    控制台打出如下日志

    这样就触发了 ReturnCallback 回调 ,从 redis 取出消息,设置 returnCallback 设置为 true。你会发现 ConfirmCallback 的 ack 返回值还是 true。

    ConfirmCallback 回调

    这里有个需要注意的地方,如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意,就像上面那种情况!!!)。所以不能单靠这个来判断消息真的发送成功了。这个时候会先触发 ReturnCallback 回调,我们把 returnCallback 设置为 true,所以还得判断 returnCallback 是否为 true,如果为 ture,表示消息发送不成功,false 才能完全保证消息成功发送到了 rabbitmq。

    如何模拟 ack 返回值为 false,先把项目启动,然后再把交换器删除,就会发现 ConfirmCallback 的 ack 为 false。

    运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

    控制台打出如下日志

    你会发现 ConfirmCallback 的 ack 返回值才是 false。

    注意

    不能单单依靠 ConfirmCallback 的 ack 返回值为 true,就断定当前消息发送成功了。

    源码地址

     
  • 相关阅读:
    [转]让搜索跨越语言的鸿沟—谈跨语言信息检索技术
    【PRML读书笔记-Chapter1-Introduction】1.6 Information Theory
    【PRML读书笔记-Chapter1-Introduction】1.5 Decision Theory
    [科研小记]
    【PRML读书笔记-Chapter1-Introduction】1.4 The Curse of Dimensionality
    【Machine Learning】wekaの特征选择简介
    【PRML读书笔记-Chapter1-Introduction】1.3 Model Selection
    【迁移学习】2010-A Survey on Transfer Learning
    【Machine Learning】机器学习の特征
    【PRML读书笔记-Chapter1-Introduction】1.2 Probability Theory
  • 原文地址:https://www.cnblogs.com/pokid/p/10527765.html
Copyright © 2011-2022 走看看