zoukankan      html  css  js  c++  java
  • RabbitMQ 消息可靠性

    一、RabbitMQ消息可靠性投递

    1、什么是消息的可靠性投递

    保证消息百分百发送到消息队列中去

    • 保证mq节点成功接受消息
    • 消息发送端需要接受到mq服务端接受到消息的确认应答
    • 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理

    2、RabbitMQ消息投递路径

    生产者-->交换机-->队列-->消费者

    通过两个的点控制消息的可靠性投递

    • 生产者到交换机
      • 通过confirmCallback
    • 交换机到队列
      • 通过returnCallback

    3、建议

    开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真心不建议用消息确认机制

    二、confirmCallback实战

    1、生产者到交换机

    通过confirmCallback

    生产者投递消息后,如果Broker收到消息后,会给生产者⼀个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心

    2、开启confirmCallback

    #旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
    spring.rabbitmq.publisher-confirms=true
    #新版, NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    spring.rabbitmq.publisher-confirm-type=correlated

    3、开发实战

    本文示例承接上文:https://www.cnblogs.com/jwen1994/p/14367946.html

    @Test
    void testConfirmCallback() {
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 配置
             * @param ack 交换机是否收到消息, true是成功, false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm====correlationData=" + correlationData);
                System.out.println("confirm====ack=" + ack);
                System.out.println("confirm=====cause=" + cause);
                //根据ACK状态做对应的消息更新操作 TODO
            }
        });
        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1");
    }

    正常情况下的输出:

    confirm====correlationData=null
    confirm====ack=true
    confirm=====cause=null

    模拟异常:修改投递的交换机名称

    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "11111", "order.new", "新订单来啦1");

    confirm====correlationData=null
    confirm====ack=false
    confirm=====cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'order_exchange111' in vhost '/dev', class-id=60, method-id=40)

    三、returnCallback实战

    1、交换机到队列

    通过returnCallback,消息从交换器发送到对应队列失败时触发

    两种模式

    • 交换机到队列不成功,则丢弃消息(默认)
    • 交换机到队列不成功,返回给消息生产者,触发returnCallback

    2、开启returnCallback配置

    #新版
    spring.rabbitmq.publisher-returns=true

    3、修改交换机投递到队列失败的策略

    #为true,则交换机处理消息到路由失败,则会返回给生产者
    spring.rabbitmq.template.mandatory=true

    4、开发实战

    /**
     * 交换机到队列可靠性投递测试
     */
    @Test
    void testReturnCallback() {
        template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                int code = returned.getReplyCode();
                System.out.println("code=" + code);
                System.out.println("returned=" + returned.toString());
            }
        });
    
        //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback");
        //模拟异常,投递一个没有绑定关系的路由key
        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xdclass.order.new", "新订单ReturnsCallback");
    }

    模拟异常后,控制台输出如下:

    code=312
    returned=ReturnedMessage [message=(Body:'新订单ReturnsCallback' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=order_exchange, routingKey=xdclass.order.new]

    四、RabbitMQ消息确认机制ACK

    消费者从broker中监听消息,需要确保消息被合理处理

    1、RabbitMQACK介绍

    • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQRabbitMQ收到反馈后才将此消息从队列中删除
    • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈, RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中
    • 只有当消费者正确发送ACK反馈, RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
    • 消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked

    2、确认方式

    • 自动确认(默认)
    • 手动确认 manual
    #开启⼿动确认消息,如果消息重新入队,进⾏重试
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    3、代码实战

    package net.xdclass.xdclasssp.mq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "order_queue")
    public class OrderMQListener {
    
        @RabbitHandler
        public void messageHandler(String body, Message message, Channel channel) throws Exception {
            long msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag=" + msgTag);
            System.out.println("message=" + message.toString());
            System.out.println("body=" + body);
    
            //复杂业务逻辑
    
            //告诉broker,消息已经被确认
            channel.basicAck(msgTag, false);
    
            //告诉broker,消息拒绝确认
            //channel.basicNack(msgTag,false,true);
    
            //channel.basicReject(msgTag,true);
        }
    }

    deliveryTag介绍

    • 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

    basicNack介绍

    • basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue

    basicReject介绍

    • basicNack方法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue

    人工审核异常消息

    • 设置重试阈值,超过后确认消费成功,记录消息,人工处理
  • 相关阅读:
    day04用户交互和运算符
    day04垃圾回收机制
    day4
    B2. K for the Price of One (Hard Version)
    C. New Year and Permutation
    Rational Ratio
    C. Two Arrays
    D. MEX maximizing
    B. Infinite Prefixes
    C. Obtain The String
  • 原文地址:https://www.cnblogs.com/jwen1994/p/14371571.html
Copyright © 2011-2022 走看看