这个类非常强大,我们可以对他做很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
可以设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
可以设置消费者数量、最大最小数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的转换器、消息转换器等
很多基于RabbitMQ的自制定化后端管控台在进行动态配置的时候,也是根据这一特性去实现的。
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态
修改其消费者数量的大小、接收消息的模式等
SimpleMessageListenerContainer为什么可以进行动态感知设置变更?
package com.dwz.spring; import java.util.UUID; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; @Configuration @ComponentScan("com.dwz.spring.*") public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setVirtualHost("/vhost_dwz"); connectionFactory.setUsername("root_dwz"); connectionFactory.setPassword("123456"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); System.err.println("RabbitAdmin启动了。。。"); //设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置) rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * 针对消费者的配置 * 1.设置交换机的类型 * 2.将队列绑定到交换机 * FanoutExchange:将消息分发到所有绑定的队列,无routingkey的概念 * TopicExchange:多关键字匹配 * HeadersExchange:通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true);//队列持久化 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true);//队列持久化 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public TopicExchange exchange003() { return new TopicExchange("topic003", true, false); } @Bean public Queue queue003() { return new Queue("queue003", true);//队列持久化 } @Bean public Binding binding003() { return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true);//队列持久化 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true);//队列持久化 } /* * 简单消息监听容器 */ @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //同时监听多个队列 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); //设置当前的消费者数量 container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); //设置是否重回队列 container.setDefaultRequeueRejected(false); //设置自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //设置监听外露 container.setExposeListenerChannel(true); //设置消费端标签策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //设置消息监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody(), "utf-8"); System.out.println("-----------消费者:" + msg); } }); return container; } }
自定义消费端标签策略效果图: