RabbitMQ 默认采用轮询的方式分发消息,当一个消息需要有多个消费者都消费时,需要创建多个队列实现,示例如下:
@Component
public class SimpleConsume {
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(durable = "true"), // 不指定队列名称,系统会自动生成队列名
exchange = @Exchange(value = "simpleExchange",
type = "topic"),
key = "simple.*"
)
)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendSimpleMessage() {
String exchange = "simpleExchange";
String routingKey = "simple.message";
for(int i=0;i<5;i++) {
rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange ===="+ i);
}
}
}
启动两个端口不同的项目:执行 sendSimpleMessage 方法,查看控制台输出:
端口 8080 控制台:
--------------------------------------
消费端Payload: hello simpleExchange ====0
--------------------------------------
消费端Payload: hello simpleExchange ====1
--------------------------------------
消费端Payload: hello simpleExchange ====2
--------------------------------------
消费端Payload: hello simpleExchange ====3
--------------------------------------
消费端Payload: hello simpleExchange ====4
端口 8081 控制台:
--------------------------------------
消费端Payload: hello simpleExchange ====0
--------------------------------------
消费端Payload: hello simpleExchange ====1
--------------------------------------
消费端Payload: hello simpleExchange ====2
--------------------------------------
消费端Payload: hello simpleExchange ====3
--------------------------------------
消费端Payload: hello simpleExchange ====4
可以看到两个消费者都可以消费到所有消息了。