之前我们使用 RabbitMQ 原生的 API 方法来实现MQ的使用,Spring 也提供了 RabbitMQ 的集成,让我们更方便的使用MQ,让我们来学习下吧。
Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等,很大方便我们使用 RabbitMQ 程序的相关开发。
一、RabbitAdmin 管理组件
1.1 准备工作:
- 添加 Spring AMQP 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
- 声明 Bean 对象
@Configuration
public class RabbitMQConfig {
/**
* 注入连接工厂对象
* @return
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("111.231.83.100");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 必须显式设置为 True ,否则 Spring 容器不会加载
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
1.2 Exchange 操作
相关方法:
方法定义 | 作用 |
---|---|
void declareExchange(Exchange exchange) | 声明交换机 |
boolean deleteExchange(String exchange) | 删除交换机 |
添加交换机
@SpringBootTest
public class ExchangeAddTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldAddExchangeSuccess() {
rabbitAdmin.declareExchange(new DirectExchange("admin.direct", true, false));
rabbitAdmin.declareExchange(new TopicExchange("admin.topic", false, true));
rabbitAdmin.declareExchange(new FanoutExchange("admin.fanout", false, false));
}
}
删除交换机
@SpringBootTest
public class ExchangeAddTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldDeleteExchangeSuccess() {
boolean result = rabbitAdmin.deleteExchange("admin.direct");
Assert.assertTrue(result);
}
}
1.3 Queue 操作
方法定义 | 作用 |
---|---|
Queue declareQueue() | 声明默认队列 |
String declareQueue(Queue queue) | 申明给定的队列 |
boolean deleteQueue(String queueName) | 删除队列 |
void deleteQueue(String queueName, boolean unused, boolean empty) | 删除队列 |
void purgeQueue(String queueName, boolean noWait) | 清除队列信息,noWait = true 时异步执行 |
int purgeQueue(String queueName) | 清除队列信息 |
Properties getQueueProperties(String queueName) | 获取指定队列的属性 |
声明队列
@SpringBootTest
public class QueueTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldAddQueueSuccess() {
// 创建默认队列
Queue defaultQueue = rabbitAdmin.declareQueue();
Assert.assertNotNull(defaultQueue);
Assert.assertEquals(false,defaultQueue.isDurable());
// 创建指定名称和是否持久化属性的队列
String queueName = rabbitAdmin.declareQueue(new Queue("orderQueue",true));
Assert.assertNotNull(queueName);
Assert.assertEquals("orderQueue",queueName);
}
}
注: 默认的队列因为设置 exclusive = true ,导致在其连接断开的时候自动删除,所以图中看不到。
删除队列
@SpringBootTest
public class QueueTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldDeleteQueueSuccess() {
boolean result = rabbitAdmin.deleteQueue("orderQueue");
Assert.assertTrue(result);
}
}
1.4 Binding 绑定
方法定义 | 作用 |
---|---|
void declareBinding(Binding binding) | 声明队列与交换机的绑定 |
void removeBinding(Binding binding) | 删除队列与交换机的绑定 |
声明队列与交换机的绑定
@SpringBootTest
public class BindingTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldBindingSuccess() {
// 交换机名称
String exchange = "admin.topic";
// 队列名称
String queueName = "orderQueue";
// 1.创建绑定关系对象
Binding binding =
BindingBuilder
// 创建队列
.bind(new Queue(queueName, true))
// 创建交换机
.to(new TopicExchange(exchange, true, false))
// 指定路由 Key
.with("order#");
// 2.进行绑定
rabbitAdmin.declareBinding(binding);
}
}
删除队列与交换机的绑定
@SpringBootTest
public class BindingTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void shouldUnBindingSuccess() {
// 交换机名称
String exchange = "admin.topic";
// 队列名称
String queueName = "orderQueue";
Binding binding =
new Binding(queueName, Binding.DestinationType.QUEUE, exchange, "order#", null);
rabbitAdmin.removeBinding(binding);
}
}
1.5 bean 注入
除了上面的通过代码显式申明交换机、队列、路由 之外,还可以通过 Bena 注入的形式申明。
@Configuration
public class RabbitMQConfig {
/**
* 注入连接工厂对象
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("111.231.83.100");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 必须显式设置为 True ,否则 Spring 容器不会加载
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public TopicExchange beanExchange() {
return new TopicExchange("beanExchange", true, false);
}
@Bean
public Queue beanQueue() {
return new Queue("beanQueue", true);
}
@Bean
public Binding beanBinding(TopicExchange beanExchange, Queue beanQueue) {
return BindingBuilder
// 创建队列
.bind(beanQueue)
// 创建交换机
.to(beanExchange)
// 指定路由 Key
.with("bean#");
}
}
@SpringBootTest
public class BeanInjectionBindingTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private Binding beanBinding;
@Test
public void shouldBindingSuccess() {
rabbitAdmin.declareBinding(beanBinding);
}
}
二、RabbitTemplate 模板组件
如果你看过 RabbitAdmin 的源码,可以看到里面使用到了一个叫做 RabbitTemplate 的对象,它就是 Spring 提供的消息模板,封装了 RabbitMQ 核心 API 的一系列方法,而 RabbitAdmin 是在它之上的另一层封装。
2.1 常用方法
方法定义 | 作用 |
---|---|
void send(Message message) | 发送消息 |
void convertAndSend(Object object) | 将 Java 对象包装成 Message 对象并发送 ,Java 对象需要实现 Serializable 序列化接口 |
Message receive(String queueName) | 接收消息 |
receiveAndConvert(String queueName) | 接收消息并将 Message 转换成 Java 对 |
2.2 发送消息
@SpringBootTest
public class MessageTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void shouldSendMessageSuccess(){
// 创建消息属性对象
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
// 创建消息对象
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("beanExchange","bean#",message);
// 发送消息时额外增加属性
Message newMessage = new Message("newMessage".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("beanExchange", "bean#", newMessage, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
}
2.3 手动接收消息
@SpringBootTest
public class MessageTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void shouldConsumeMessageSuccess() {
Message msg = rabbitTemplate.receive("beanQueue", 2000l);
System.out.println("消息内容:" + new String(msg.getBody()));
final Map<String, Object> headers = msg.getMessageProperties().getHeaders();
System.out.println("=======消息头属性=======");
for (String key : headers.keySet()) {
System.out.println("key =" + key + " ; value =" + headers.get(key));
}
}
}
执行方法,观察控制台输出:
消息内容:Hello RabbitMQ
=======消息头属性=======
key =type ; value =自定义消息类型..
key =desc ; value =信息描述..
再次执行方法,观察控制台输出:
消息内容:newMessage
=======消息头属性=======
key =type ; value =自定义消息类型..
key =attr ; value =额外新加的属性
key =desc ; value =额外修改的信息描述
2.4 消息监听容器
在实际项目中我们不可能采用手动接收消息的形式来消费消息,这个时候 Spring 就为我们提供了一个消息监听容器 SimpleMessageListenerContainer。
它的功能如下:
* 监听多个队列
* 设置消费者消费数量
* 设置消息确认和自动确认模式
* 是否重回队列
* 异常捕获 handel 函数
* 设置消费者属性
* 设置具体的监听器和消息转换器
SimpleMessageListenerContainer 可以在运行过程中动态修改属性,如修改消费者消费数量大小、接收消息的模式等
@Configuration
public class RabbitMQConfig {
......
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue beanQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听队列,可以有多个
container.setQueues(beanQueue);
// 设置并发消费者数量
container.setConcurrentConsumers(1);
// 设置最大并发消费者数量
container.setMaxConcurrentConsumers(5);
// 设置是否重回队列
container.setDefaultRequeueRejected(false);
// 设置签收模式,这里为了演示使用自动签收,实际项目中需要使用手动签收 AcknowledgeMode.MANUAL
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置消费端标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 设置消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("消费端监听:" + msg);
}
});
return container;
}
}
启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
消费端监听:Hello RabbitMQ
证明消费端监听并消费成功。
2.5 消息监听适配器
除了直接使用 ChannelAwareMessageListener 实现消息事件监听外,还可以通过消息监听适配器(MessageListenerAdapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于 Rabbit API。
实际上就是相当于自己实现 ChannelAwareMessageListener 功能。
- 新建 MessageDelegate
public class MessageDelegate {
public void consumeMessage(byte[] messageBody) {
System.err.println("默认方法, 消息内容:" + new String(messageBody));
}
}
- 替换原 ChannelAwareMessageListener 事件
// container.setMessageListener(new ChannelAwareMessageListener() {
// @Override
// public void onMessage(Message message, Channel channel) throws Exception {
// String msg = new String(message.getBody());
// System.err.println("消费端监听:" + msg);
// }
// });
// 设置消息监听
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);
- 启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
默认方法, 消息内容:Hello RabbitMQ
证明消费端监听并消费成功。
我们还可以将队列名和方法做绑定,实现转发功能:
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
// adapter.setDefaultListenerMethod("consumeMessage");
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(beanQueue.getName(), "consumeMessage");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
2.6 消息转换器
我们现在发送和接受消息的类型都是二进制形式传输,我们可以通过 MessageConverter 进行转换。
- 新建 TextMessageConverter 类
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
- adapter 设置转换类
adapter.setMessageConverter(new TextMessageConverter());
- MessageDelegate 新增 字符串参数的方法
public class MessageDelegate {
......
public void consumeMessage(String messageBody){
System.err.println("字符串类型, 消息内容:" + new String(messageBody));
}
}
- 新增发送文本消息测试方法
@Test
public void shouldSendTextMessageSuccess() {
// 创建消息属性对象
MessageProperties messageProperties = new MessageProperties();
// 通过设置属性,让消费端知道要将消息内容转换成文本类型
messageProperties.setContentType("text");
// 创建消息对象
Message message = new Message("字符串消息".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("beanExchange", "bean#", message);
}
启动应用后,执行发送消息测试方法,观察控制台输出:
字符串类型, 消息内容:字符串消息