zoukankan      html  css  js  c++  java
  • RabbitMQ消息确认机制—消息发送确认和消息接收确认

    /**
    * RabbitMQ消息确认机制
    * 关于rabbit的生产和消费方的一些实用的操作;
    * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失

    */

    /**
         * producer的confirm模式
         * 业务场景描述:
         * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加,
         * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信;
         * 此时插入mq消息的服务为了保证给所有用户发消息,并且要在短时间内插入完成(因此用到了异步插入方式(快速)),
         * 我们就需要知道每次插入mq是否成功,如果不成功那我们可以收集失败的信息后补发(因此confirm模式排上了用场);
         * 开启confirm模式后,返回send结果(成功或失败)
         */
        
        public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback){
            CachingConnectionFactory connectionFactory = null;
            return getRabbitTemplate(connectionFactory, confirmCallback);
            
        }
        
        //producer生产 - confirm模式
        public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory,RabbitTemplate.ConfirmCallback confirmCallback){
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            //product开启confirm模式
            connectionFactory.setPublisherConfirms(true);
            //设置confirm回调处理
            template.setConfirmCallback(confirmCallback);
            return template;
            
        }
    /**
         * consumer的ack模式
         * 场景描述:短信服务去消费mq队列信息时,倘若服务调用的运营商发送短信接口异常了(短信运营商接口欠费),
         * 我们此时的短信是发送失败的,用户也收不到短信,但是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外);
         * 想要mq消息在业务逻辑异常时还存在,那么可以使用ack方式;
         * 业务无异常,发送ack标识,mq消息释放
         * 
         * 在springboot中可以使用基于amqp封装的工厂类关闭自动ack模式,改为手动ack方式;
         * 只有当业务代码流程走完后,最后通过代码设置ack标识,来通知rabbit消息可以丢弃了;
         * 如果设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在无法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中):
         */
    
        public SimpleRabbitListenerContainerFactory listenerContainerFactory(){
            ConnectionFactory connectionFactory = null;
            return listenerContainerFactory(connectionFactory);
        }
        
        public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //代码手动ack
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //开启消费者数量
            factory.setConcurrentConsumers(2);
            //手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量
            //每次接受数据量,默认250
            factory.setPrefetchCount(300);
            return factory;
        }
        
        /**
         * 消息确认–ACK
         * 通过连接工厂设置手动ack方式,然后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,如下:
         */
        private void firstNodeListener(String msg,Channel channel,Message message){
            try {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                System.out.println("firstNodeListener - 消费消息 [" + deliveryTag + "] - " + msg);
                //这里ack主要根据mq消息的唯一编号(deliverTag)来通知;
                //如果我们不设置ack确认,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中
                //忘记通过basicAck返回确认信息,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息
                channel.basicAck(deliveryTag, true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

     本文源自:https://www.cnblogs.com/wangrudong003/p/11436990.html

  • 相关阅读:
    Pycharm5注册方式
    五、监听共享目录文件
    三、python webservice
    二、Python安装扩展库
    一、Python安装下载
    test
    拖延
    要乐观对待生活
    乞讨者
    不要总是指责和埋怨
  • 原文地址:https://www.cnblogs.com/wueryuan/p/12299951.html
Copyright © 2011-2022 走看看