- 应用场景:
- 代码示例:
- pom依赖和rabbitmq.properties与spring集成rabbitmq相同
- 在spring-rabbitmq-consumer.xml 需要做如下配置:(设置消费者监听器为手动确认模式,并且配置参数prefetch表示消费端每次从mq拉取多少条消息。直到手动确认消费完毕,才会继续拉取)
- 限流消费端-没有手动确认代码:
/** * Consumer 限流机制 * 1. 确保消息被确认,不确认是不会处理其他消息。 * 2. listener-container配置属性 * prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到确认消费完毕后,才会继续拉取下一条消息。 * 3. 手动自动都可以实现完全消费,但是只有消息确认消费完毕,才可以拉取下一条。 * 4. 但是在限流策略中,必须设置为手动确认。因为自动确认其实就跟平常的消费没有区别了。 */ public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { //1.获取消息 System.out.println(new String(message.getBody())); } }
- producer生产消息代码:
@RunWith(SpringRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Test public void testSend() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm"); } } }
- 因为上面消费端没有手动确认,所以会出现 1条消息待确认,9条消息未消费的情况。
- 消费端限流--手动确认代码:
- 消费端限流小结:
- 在 rabbit:listener-container中配置 prefetch 属性设置消费端一次拉取多少条消息
- 消费端的确认模式一定为手动确认。acknowledge="manual",并且手动确认一定要调用channel.basicAck();方法。
- 实现接口一定是实现ChannelAwareMessageListener接口。