本章导航
- RabbitMQ整合Spring AMQP实战
- RabbitMQ整合Spring Boot实战
- RabbitMQ整合Spring Cloud实战
RabbitMQ整合Spring AMQP实战
- RabbitAdmin
- SpringAMQP声明
- SimpleMessageListenerContainer简单消息监听容器
- MessageListenerAdapter消息监听适配器
- MessageConverter 消息转换器,序列化和反序列化等操作
- 注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
- RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
- 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作
- 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等
MessageListenerAdapter
MessageListenerAdapter即消息监听适配器
- 通过messageListenerAdapter的代码我们可以看出如下核心属性
- defaultListenerMethod默认监听方法名称:用于设置监听方法名称
- Delegate委托对象:实际真实的委托对象,用于处理消息
- queueOrTagToMethodName队列标识与方法名称组成的集合
- 可以一一进行队列与方法名称的匹配
- 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
代码实现
package com.cx.temp.common.rabbitmq.spring; import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate; import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import java.util.UUID; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Binding binding003(){ return BindingBuilder.bind(queue003()).to(exchange002()).with("spring.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); //不进行重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // container.setMessageListener(new ChannelAwareMessageListener() { // @Override // public void onMessage(Message message, Channel channel) throws Exception { // String msg = new String(message.getBody()); // System.err.println("-----------消费者:" + msg); // } // }); //第一种 // MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // container.setMessageListener(adapter); //第二种 // MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage //第三种 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter); return container; } }
字节转换器
package com.cx.temp.common.rabbitmq.spring.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { return new Message(o.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
自定义输出设置
package com.cx.temp.common.rabbitmq.spring.adapter; /** */ public class MessageDelegate { //方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】 //表示他是通过反射查找handleMessage这个方法名进行处理的 //第一种 旧版的RabbitMQ默认是字节数据 // public void handleMessage(byte[] messageBody) { // System.out.println("默认方法,消息内容:" + new String(messageBody)); // } //第一种 现演示的版本默认走字符串 // public void handleMessage(String messageBody) { // System.out.println("默认方法,消息内容:" + messageBody); // } //第二种 public void consumeMessage(byte[] messageBody) { System.out.println("字节数组方法,消息内容:" + new String(messageBody)); } // // public void consumeMessage(String messageBody) { // System.out.println("字符串方法,消息内容:" + messageBody); // } // // public void method1(String messageBody) { // // } }
测试类
package com.cx.temp.rabbitmq; import com.cx.temp.admin.AdminApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = AdminApplication.class) public class RabbitMQTest { @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //第一种声明与绑定方式 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //第二种 支持链式声明与绑定 rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.topic.queue", false)) .to(new TopicExchange("test.topic", false, false)) .with("user.#")); rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { //消息发送之后在对这个message进行设置 @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("----------添加额外的设置-----------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); } @Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc",message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!"); } @Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc",message); } }
执行测试类
testSendMessage4Text方法
- queueOrTagToMethodName队列标识与方法名称组成的集合
package com.cx.temp.common.rabbitmq.spring; import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate; import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * */ @Configuration @ComponentScan({"com.cx.temp.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("root"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/test001"); return connectionFactory; } public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1.设置交换机类型 * 2.将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange: 通过添加属性key-value匹配 * DirectExchange: 按照routingkey分发到指定队列 * TopicExchange: 多关键字匹配 * @return */ @Bean public TopicExchange exchange001(){ return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(){ return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002(){ return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding002(){ return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Binding binding003(){ return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); //不进行重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // container.setMessageListener(new ChannelAwareMessageListener() { // @Override // public void onMessage(Message message, Channel channel) throws Exception { // String msg = new String(message.getBody()); // System.err.println("-----------消费者:" + msg); // } // }); //第一种 // MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // container.setMessageListener(adapter); //第二种 // MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage //第三种 /** 1.适配器方式,默认是有自己的方法名称的:handleMessage //可以自己指定一个方法的名字:consumeMessage //也可以添加一个转换器:从字节数组转换String MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter); */ /** * 2.适配器方式:我们的队列名称和方法名称也可以也进行一一匹配 */ Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter); return container; } }
package com.cx.temp.common.rabbitmq.spring.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { return new Message(o.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
package com.cx.temp.common.rabbitmq.spring.adapter; /** */ public class MessageDelegate { //方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】 //表示他是通过反射查找handleMessage这个方法名进行处理的 //第一种 旧版的RabbitMQ默认是字节数据 // public void handleMessage(byte[] messageBody) { // System.out.println("默认方法,消息内容:" + new String(messageBody)); // } //第一种 现演示的版本默认走字符串 // public void handleMessage(String messageBody) { // System.out.println("默认方法,消息内容:" + messageBody); // } //第二种 旧版 // public void consumeMessage(byte[] messageBody) { // System.out.println("字节数组方法,消息内容:" + new String(messageBody)); // } //第二种 新版 public void consumeMessage(String messageBody) { System.out.println("字符串方法,消息内容:" + messageBody); } public void method1(String messageBody) { System.out.println("method1收到消息内容:" + messageBody); } public void method2(String messageBody) { System.out.println("method2收到消息内容:" + messageBody); } }
package com.cx.temp.rabbitmq; import com.cx.temp.admin.AdminApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = AdminApplication.class) public class RabbitMQTest { @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //第一种声明与绑定方式 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //第二种 支持链式声明与绑定 rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.topic.queue", false)) .to(new TopicExchange("test.topic", false, false)) .with("user.#")); rabbitAdmin.declareBinding(BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { //消息发送之后在对这个message进行设置 @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("----------添加额外的设置-----------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); } @Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc",message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!"); } @Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc",message); rabbitTemplate.send("topic002", "rabbit.abc",message); } }
启动Appliciton后执行
testSendMessage4Text控制台输出