zoukankan      html  css  js  c++  java
  • springboot集成rabbitmq并手动注册容器实现单个queue的ack模式

    进入正题,本文会介绍两种实现rabbitmq的ack模式的方法,分别为:

    一、通过配置文件配置。

    二、通过手动注册 SimpleMessageListenerContainer容器实现。

    先介绍方法一:
    通过配置文件配置。
    此类实现起来较为方便,通过springboot的配置文件以及注解的形式即可完成。

    1.首先引入依赖:

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2.编写配置文件

    # rabbitmq基本配置
    spring.rabbitmq.host=***
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=***
    spring.rabbitmq.password=***
    spring.rabbitmq.virtual-host=/
    
    # 开启发送确认
    spring.rabbitmq.publisher-confirms=true
    # 开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    # 全局开启ACK
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    在配置文件中使用
    
    spring.rabbitmq.listener.simple.acknowledge-mode
    来配置ack模式,这个配置有三种配置方式,分别为NONE、MANUAL、AUTO。

    I:NONE:默认为NONE,也就是自动ack模式,在消费者接受到消息后无需手动ack,消费者会自动将消息ack掉。

    II:MANUAL:即为手动ack模式,消费者在接收到消息后需要手动ack消息,不然消息将一直处于uncheck状态,在应用下次启动的时候会再次对消息进行消费。使用该配置需要注意的是,配置开启后即项目全局开启手动ack模式,所有的消费者都需要在消费信息后手动ack消息,否则在重启应用的时候将会有大量的消息无法被消费掉而重复消费。

    III:AUTO:自动确认ack 如果此时消费者抛出异常,不同的异常会有不同的处理方式。

    3.编写MQConfig的代码,实现相应的queue和exchange的注册以及绑定。

    /**
    * ACK 测试
    */
    public static final String ACK_QUEUE_A = "ack.test.queue.A";
    public static final String ACK_QUEUE_B = "ack.test.queue.B";
    public static final String ACK_EXCHANGE = "ack.test.exchange";
    
    /**
    * ACK TEST
    */
    @Bean
    public Queue ackQueueA() {
    return new Queue(ACK_QUEUE_A);
    }
    
    @Bean
    public Queue ackQueueB() {
    return new Queue(ACK_QUEUE_B);
    }
    
    @Bean
    public FanoutExchange ackFanoutExchange() {
    return new FanoutExchange(ACK_EXCHANGE);
    }
    
    @Bean
    public Binding ackBindingA() {
    return BindingBuilder.bind(ackQueueA()).to(ackFanoutExchange());
    }
    
    @Bean
    public Binding ackBindingB() {
    return BindingBuilder.bind(ackQueueB()).to(ackFanoutExchange());
    }

    上方代码中做了三件事:

    I.注册了两个queue,分别为ackQueueA以及ackQueueB。

    II.注册了一个fanout类型的exchange。

    III.将两个queue和其绑定。

    4. 生产者代码编写

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
    * @author hsw
    * @since 9:26 2019/4/2
    */
    @Slf4j
    @Service
    public class MQAckSender {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void ackMQSender(String msg) {
    log.info("send ack message :" + msg);
    // 生产者发送消息到exchange后没有绑定的queue时将消息退回
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    log.info("ackMQSender 发送消息被退回" + exchange + routingKey);
    });
    // 生产者发送消息confirm检测
    this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
    log.info("ackMQSender 消息发送失败" + cause + correlationData.toString());
    } else {
    log.info("ackMQSender 消息发送成功 ");
    }
    });
    this.rabbitTemplate.convertAndSend(MQConfig.ACK_EXCHANGE, "", msg);
    }
    
    }

    这里使用了RabbitTemplate而没有使用AmqpTemplate,可以将RabbitTemplate看作一个实现了AmqpTemplate的工具类,其中定义了更多方法供开发者使用。

    在第一步的配置文件中定义了MANUAL的ack模式的同时,也配置了发送确认以及发送失败退回,所以在上述生产者代码中,分别配置了这两项。具体回调时间见注释。

    5.消费者代码编写

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    
    /**
    * @author hsw
    * @since 9:39 2019/4/2
    */
    @Slf4j
    @Service
    public class MQAckReceive {
    
    
    @RabbitListener(queues = MQConfig.ACK_QUEUE_A)
    public void process(String msg, Channel channel, Message message) throws IOException {
    log.info("ACK_QUEUE_A 收到 : " + msg);
    try {
    // 框架容器,是否开启手动ack按照框架配置
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    log.info("ACK_QUEUE_A 接受信息成功");
    } catch (Exception e) {
    e.printStackTrace();
    //丢弃这条消息
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    log.info("ACK_QUEUE_A 接受信息异常");
    }
    
    }
    
    @RabbitListener(queues = MQConfig.ACK_QUEUE_B)
    public void process2(String msg, Channel channel, Message message) throws IOException {
    log.info("ACK_QUEUE_B 收到 : " + msg);
    try {
    //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 重启应用后还会在发
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    log.info("ACK_QUEUE_B 接受信息成功");
    } catch (Exception e) {
    e.printStackTrace();
    //丢弃这条消息
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    log.info("ACK_QUEUE_B 接受信息异常");
    }
    
    }
    }

    上述代码定义了两个消费者,即为之前定义的ackQueueA以及ackQueueB的消费者。

    与默认ack模式的消费者不同的是,在消费者消费信息的时候,需要手动ack掉信息,即为上述代码中的:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    该方法有两个参数,分别为long类型和boolean类型:

    /**
    * Acknowledge one or several received
    * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
    * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
    * containing the received message being acknowledged.
    * @see com.rabbitmq.client.AMQP.Basic.Ack
    * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
    * @param multiple true to acknowledge all messages up to and
    * including the supplied delivery tag; false to acknowledge just
    * the supplied delivery tag.
    * @throws java.io.IOException if an error is encountered
    */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型,为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
    
    再看下另外两个拒绝消息的函数:
    
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    第一个方法 basicNack 有三个参数,分别为long类型、boolean类型和boolean类型:
    
    /**
    * Reject one or several received messages.
    *
    * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
    * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
    * @see com.rabbitmq.client.AMQP.Basic.Nack
    * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
    * @param multiple true to reject all messages up to and including
    * the supplied delivery tag; false to reject just the supplied
    * delivery tag.
    * @param requeue true if the rejected message(s) should be requeued rather
    * than discarded/dead-lettered
    * @throws java.io.IOException if an error is encountered
    */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    throws IOException;

    前两个参数和接受方法 basicAck 的参数相似,第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为true时会将小于等于此次tag的所有消息都拒绝掉,如果为false则只拒绝当前tag的信息,可根据实际情况进行选择。

    第三个参数为requeue,为true的时候会将消息重新发送到当前队列。可根据具体业务需求中不同的异常捕捉实现不同的拒绝方式。

    第二个方法 basicReject 和 basicAck 方法类似,但是只能拒绝/重发当前tag的信息。

    6.项目测试

    @GetMapping("/ack")
    public void springAck() {
    try {
    mqAckSender.ackMQSender("this is a ack msg");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    调用接口后返回:

    2019-04-03 10:18:07.018 INFO 7352 --- [nio-8081-exec-3] c.h.a.rabbitmq.amqp.MQAckSender : send ack message :this is a ack msg
    2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 收到 : this is a ack msg
    2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 接受信息成功
    2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 收到 : this is a ack msg
    2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 接受信息成功
    2019-04-03 10:18:07.035 INFO 7352 --- [2.20.4.100:5672] c.h.a.rabbitmq.amqp.MQAckSender : ackMQSender 消息发送成功 
    若在queueA消费者ack消息前打上断点,可在rabbitmq管理后台看到:

    第一种方式的手动ack模式开启成功!

    接下来介绍方法二:
    通过手动注册 SimpleMessageListenerContainer容器实现。
    方法一通过注解方式开启ack模式固然方便,但通过注解方式开启后,项目全局的ack模式都将被修改,那怎么样做到只修改单个消费者的ack模式呢?这里就需要手动注册相应容器来修改ack模式。话不多说,先上代码。

    MQConfig和MQSender端代码不变。

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(ACK_QUEUE_A);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    log.info(ACK_QUEUE_A + "get msg:" +new String(message.getBody()));
    if(message.getMessageProperties().getHeaders().get("error") == null){
    // 消息手动ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    log.info("消息确认");
    }else {
    // 消息重新回到队列
    //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    // 拒绝消息(删除)
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    log.info("消息拒绝");
    }
    
    });
    return container;
    }

    与第一种方法的不同点:

    1、配置文件配置的ack模式不会影响。

    2、消费者需要配置在setMessageListener中。

    上述代码中,手动注册了一个SimpleMessageListenerContainer容器,并将对应的queueName、需要修改的ack模式以及消费者收到消息后的处理一并注入到spring中。

    由于是手动注册容器,不受到配置文件的影响,所以可以实现对单个queue的ack模式修改。

    需要注意的是,如果消费者依旧使用@RabbitListener注解进行消费信息,手动注册容器中修改的ack模式是无效的。

  • 相关阅读:
    对 【Sequence to Sequence Learning with Neural Networks】的理解
    对 【Bidirectional LSTM-CRF Models for Sequence Tagging】的理解
    对 【Evaluation methods for unsupervised word embeddings 】 的理解
    对【XGBoost】的理解
    可变卷积Deforable ConvNet 迁移训练自己的数据集 MXNet框架 GPU版
    Ubuntu16 编译源码安装MXNet 可变卷积Deformable-ConvNets GPU版
    深度学习-超参数调整总结
    迁移学习介绍
    对 【BERT- Pre-training of Deep Bidirectional Transformers for Language Understanding】 的理解
    【图文】pycharm 修改自动导入包快捷键
  • 原文地址:https://www.cnblogs.com/qingmuchuanqi48/p/13383103.html
Copyright © 2011-2022 走看看