**消费消息(队列-消费端)** 总结:当消费端发送basicAck给MQ时,说明消息消费成功! 当消费端发送basicNack给MQ时,说明消息消费失败,消息回退至MQ 注意:当消费端未给MQ发送basicAck或basicNack,消息则会一直在mq里面进行堆积。 除非与mq断开或者mq宕机才会自动将消息回退至MQ 消费端信息消费确认、消息预取机制: 前提: 开启手动确认 simpleRabbitListenerContainerFactory 通过@Bean方式手动注入 simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 如果成功,我们手动给MQ发送basicAck,表示发送成功! 如果失败,我们手动给MQ发送basicNack,表示发送失败! 设置预取数量 // 最大数量支持2500,建议默认取值500 simpleRabbitListenerContainerFactory.setPrefetchCount(500); 代码如下: /** * 消费端开启手动确认 * 无论是basicAck还是basicNack操作,都必须给mq一个通知。 * 不然消息一直处于未确认状态,就会堆积在mq中。除非与mq断开后,消息会自动回到队列 * 消息预取(能者多劳) * 默认队列会将其所有数据一次性分发到多个消费端,然后消费者再进行消费,这样会存在一个问题, * 有些性能好的消费端很快就消费完了,处于空闲状态,然而有些性能差的消费端还在不停的消费中,处于忙碌状态 * 导致消费资源的分配不合理,采用消息预取来改善 * * 指定消费端一次从队列中拉取数据的个数。而不是一次性分发到多个消费端 * 当消费者消费完了当前拉取的数据时,才会继续向队列中拉取新的数据进行消费 * 这样性能好的消费端就能更多的从队列拉取数据 * 这样性能差的消费端就能更少的从队列拉取数据 * */ public class RabbitmqConsumer { private int count = 0; /** * simpleRabbitListenerContainerFactory 通过@Bean方式手动注入 * 设置消费端手动确认 * 设置消费消息预取 * // 将消息消费确认由自动改为手动确认 * simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); * // 设置消息预取数量 * simpleRabbitListenerContainerFactory.setPrefetchCount(1); * @param message * @param channel * @throws IOException */ @RabbitListener(queues = QUEUE_NAME_1, ackMode = "MANUAL", containerFactory = "simpleRabbitListenerContainerFactory") public void consumer1(Message message, Channel channel) throws IOException { count++; if (isOrder()) { System.out.println(new String(message.getBody(), "UTF-8")); System.out.println("消费者1 -- 消费成功"); // 传入这条消息的标识, 这个标识由rabbitmq来维护 我们只需要从message中拿出来就可以 // 第二个boolean参数指定是不是批量处理的, true: 批量处理,false: 非批量处理 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); if (count == 491) { /** 如果前490条消费成功,第491消费失败时的情况: * 若设置了批量处理(true),则会将491条数据都回退到原队列或死信队列 * 若设置了单条处理(false),则只会将第491那条数据回退到原队列或死信队列 * 无论是哪种情况,回退的消息会等待消费者重新进行消费,这就可能会导致消息重复消费 * 可以分别设置true/false进行测试,然后在mq管理台查看队列消费状态 第三个参数: requeue为true时,消息回退到原队列 requeue为false时,消息丢弃或者回退到死信交换机(前提:当前队列配置了死信交换机) 配置代码如下: @Bean public Queue queue() { Map<String, Object> map = new HashMap<>(); // 设置消息的过期时间 单位毫秒 map.put("x-message-ttl", 10000); // 设置附带的死信交换机 map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); map.put("x-dead-letter-routing-key", DEAD_LETTER_KEY); Queue queue = new Queue(QUEUE_NAME_1, true, false, false, map); return queue; } */ System.out.println("第491条出错了,进行回退"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); } if (count % 500 == 0) { // 当消费数量达到预取数量时,进行批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } } else { //前两个参数 和上面的意义一样,最后一个参数 就是这条消息是返回到原队列 还是这条消息作废不退回到原队列了。 // true: 退回到原队列, false: 消息作废或者退回到死信队列 System.out.println("消费者1 -- 消费失败"); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
依赖pom:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>