zoukankan      html  css  js  c++  java
  • Rabbitmq接收方消息确认

    ​ 所谓的消费方消息确认就是签收模式ack,Rabbitmq默认开启的是自动签收模式,也就是消费者监听到消息到达,就会自动发送ack给队列,告诉队列这条消息可以删除了,这种自动签收的模式存在消息丢失的可能,出现异常的话这条消息就丢了,要保证消息不会丢失,还是建议开启手动签收的模式。

    一、三种签收模式

    public enum AcknowledgeMode {
        //自动确认
        NONE,
        //手动确认
        MANUAL,
        //根据情况确认
        AUTO;
    
        private AcknowledgeMode() {
        }
    
        public boolean isTransactionAllowed() {
            return this == AUTO || this == MANUAL;
        }
    
        public boolean isAutoAck() {
            return this == NONE;
        }
    
        public boolean isManual() {
            return this == MANUAL;
        }
    }
    

    二、配置文件开启手动签收模式

    spring:
        rabbitmq:
            host: 192.168.31.70
            port: 5672
            username: guest
            password: guest
            # 发送确认
            publisher-confirms: true
            # 路由失败回调
            publisher-returns: true
            template:
                # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
                mandatory: true
            #消费端
            listener:
                simple:
                    # 每次从RabbitMQ获取的消息数量
                    prefetch: 1
                    default-requeue-rejected: false
                    # 每个队列启动的消费者数量
                    concurrency: 1
                    # 每个队列最大的消费者数量
                    max-concurrency: 1
                    # 签收模式为手动签收-那么需要在代码中手动ACK
                    acknowledge-mode: manual
    

    三、消费方手动签收

    @Component
    @Slf4j
    public class MessageHandler {
    
    
        /**
         * 邮件发送
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.email")
        @RabbitHandler
        public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                JSONObject jsonObject = JSON.parseObject(msg);
                jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
                log.info("---接受到消息---{}",jsonObject);
    			//主动异常
    			int m=1/0;
                //手动签收
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
            catch (Exception e) {
                //异常,ture 重新入队,或者false,进入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    
            }
        }
       }
    

    四、channel接口的实现类

    里面有三个手动签收的方法

    public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
    
        private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
        private final Log logger;
        private final Channel delegate;
        private final ConcurrentMap<String, Listener> listeners;
        private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms;
        private final Map<String, PendingConfirm> pendingReturns;
        private final SortedMap<Long, Listener> listenerForSeq;
        private final ExecutorService executor;
        private volatile Consumer<Channel> afterAckCallback;
        
        //......省略
        
        public void basicAck(long deliveryTag, boolean multiple) throws IOException {
            this.delegate.basicAck(deliveryTag, multiple);
        }
    
        public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
            this.delegate.basicNack(deliveryTag, multiple, requeue);
        }
    
        public void basicReject(long deliveryTag, boolean requeue) throws IOException {
            this.delegate.basicReject(deliveryTag, requeue);
        }
        
        //.......省略
        
        }
    

    4.1、三个方法区别

    • basicAck 同意签收 支持批量,设置入参mutiple为true
    • basicReject 拒绝签收,不支持批量,支持是否重新入队,设置入参requeue为true
    • basicNack 拒绝签收,支持批量,支持是否重新入队,设置入参requeue为true
  • 相关阅读:
    golang学习 ---并发获取多个URL
    MySQL的my.cnf文件(解决5.7.18下没有my-default.cnf)
    Python ElasticSearch API
    linux 输出重定向 何时会写文件
    Linux top命令的用法详细详解
    mysql 5.7.13 安装配置方法(linux)-后期部分运维
    linux下各目录的作用
    MySQL 获得当前日期时间 函数
    mysql导入大批量数据出现MySQL server has gone away的解决方法
    python之MySQL学习——防止SQL注入
  • 原文地址:https://www.cnblogs.com/geekdc/p/13604883.html
Copyright © 2011-2022 走看看