SpringBoot整合RabbitMQ
一、引入相关依赖
<dependencies>
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
二、配置RabbitMQ
首先应当确保你已安装了RabbitMQ,如果你没有安装,请参考:Docker 安装 RabbitMq
查看RabbitMQ自动配置类RabbitAutoConfiguration:
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
其中@EnableConfigurationProperties(RabbitProperties.class)
是RabbitMQ的相关属性配置。
点进去RabbitProperties.class
:
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
private static final int DEFAULT_PORT = 5672;
private static final int DEFAULT_PORT_SECURE = 5671;
/**
* RabbitMQ host. Ignored if an address is set.
*/
private String host = "localhost";
/**
* RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is
* enabled.
*/
private Integer port;
/**
* Login user to authenticate to the broker.
*/
private String username = "guest";
/**
* Login to authenticate against the broker.
*/
private String password = "guest";
我们可以通过spring.rabbitmq
,在application.yml文件中配置相关的属性,比如host、port、username、password。
在application.yml配置RabbitMQ:
spring:
#rabbitmq的相关配置
rabbitmq:
host: 192.168.204.131
port: 5672
username: guest
password: guest
继续查看RabbitAutoConfiguration:
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
发现其向容器中注入了两个组件:RabbitTemplate和AmqpAdmin,这两个组件有什么作用呢?
RabbitTemplate:可以发送消息、接收消息。
AmqpAdmin操作Exchange、Queue、Binding等,比如创建、删除、解绑。
1、测试RabbitTemplate
首先在容器中通过自动注入的方式获取RabbitTemplate,然后在测试类中测试:
@SpringBootTest
class SpringBoot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
}
(1)使用RabbitTemplate测试发送消息
- send(String exchange, String routingKey, Message message):需要自己定义一个Message,比较麻烦。
- convertAndSend(String exchange, String routingKey, Object object):只需要传入一个Object,自动序列化发送给rabbitmq,object默认被当成消息体。
//单播(点对点)发送。
@Test
public void testRabbitTemplate() {
HashMap<String, Object> map = new HashMap<>();
map.put("name", "zhangsan");
map.put("age", 22);
rabbitTemplate.convertAndSend("exchange.direct","aiguigu.news",map);
}
这种方式在接收端接收的数据是这样式的:
rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAADdAAEbmFtZXQA CHpoYW5nc2FudAAEbGlzdHNyABpqYXZhLnV0aWwuQXJyYXlzJEFycmF5TGlzdNmkPL7NiAbSAgABWwABYXQAE1tMamF2YS9sYW5nL09iamVjdDt4cHVyABdb TGphdmEuaW8uU2VyaWFsaXphYmxlO67QCaxT1+1JAgAAeHAAAAADdAAEaGFoYXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQ amF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAKac3IAEWphdmEubGFuZy5Cb29sZWFuzSBygNWc+u4CAAFaAAV2YWx1ZXhwAXQAA2FnZXNxAH4ACwAA
ABZ4
这是由于默认使用的是jdk的序列化方式,那么如何将消息转化为json格式的数据发送出去?接下来自定义使用Jackson2JsonMessageConverter
的消息转化器。
自定义MessageConverter配置:
@Configuration
@EnableRabbit //开启基于注解的rabbitmq
public class MyAMQPConfig {
/**
* 设置自定义的 MessageConverter
* 使用Jackson2JsonMessageConverter消息转换器
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
然后再次测试,在接收端接收的数据如下:
{"name":"zhangsan","list":["haha",666,true],"age":22}
再发送个对象试试:
Book book = new Book("西游记", "吴承恩");
rabbitTemplate.convertAndSend("exchange.direct", "aiguigu.news", book);
使用自定义的消息转化器之后,接收端数据:
{"bookName":"西游记","author":"吴承恩"}
(2)使用RabbitTemplate测试接收消息
- receiveAndConvert(String queueName):接收队列名称为queueName的消息。
//接收数据
@Test
public void testReceive() {
Object o = rabbitTemplate.receiveAndConvert("aiguigu.news");
System.out.println(o.getClass());
System.out.println(o);
// 接收map
// class java.util.HashMap
// {name=zhangsan, list=[haha, 666, true], age=22}
// 接收book对象
// class com.example.bean.Book
// Book{bookName='西游记', author='吴承恩'}
}
2、测试AmqpAdmin
-
removeBinding(Binding binding):解除某个bingding
@Test public void testRemoveBinding() { //解除某个bingding amqpAdmin.removeBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,"amqpAdmin_direct.exchange", "amqp.haha", null)); }
-
deleteExchange(String s):删除指定的exchange.
boolean deleteExchange = amqpAdmin.deleteExchange("amqpAdmin_direct.exchange"); System.out.println(deleteExchange); //true
-
deleteQueue(String s):删除指定Queue
boolean deleteQueue = amqpAdmin.deleteQueue("declaredQueue"); System.out.println("deleteQueue:"+deleteQueue); //true
-
getQueueInfo(String s),获取指定队列的信息。
@Test public void getQueueInformation() { QueueInformation queueInformation = amqpAdmin.getQueueInfo("declaredQueue"); int consumerCount = queueInformation.getConsumerCount(); int messageCount = queueInformation.getMessageCount(); String name = queueInformation.getName(); System.out.println("consumerCount:" + consumerCount); //0 System.out.println("messageCount:" + messageCount); //0 System.out.println("name:" + name); //declaredQueue }
-
declareExchange(Exchange exchange):声明一个exchange.
/** * 以declare开头的是创建组件。 * declareExchange(Exchange exchange):声明一个exchange * Exchange是一个接口,其实现类有: * 1.DirectExchange * 2.FanoutExchange * 3.TopicExchange * 4.HeadersExchange * 5.CustomExchange */ @Test public void testCreateExchange() { //创建一个Exchange amqpAdmin.declareExchange(new DirectExchange("amqpAdmin_direct.exchange")); System.out.println("创建完成!"); //创建一个queue String declaredQueue = amqpAdmin.declareQueue(new Queue("declaredQueue")); System.out.println("declaredQueue:" + declaredQueue); //declaredQueue //创建绑定规则 amqpAdmin.declareBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE, "amqpAdmin_direct.exchange", "amqp.haha", null)); }
三、监听消息队列中的内容
使用@EnableRabbit+@RabbitListener监听消息队列中的内容。
@EnableRabbit
:表示开启基于注解的rabbitmq。
@RabbitListener
:表示监听某个队列的内容。
@Service
public class BookServiceImpl implements BookService {
/**
* 注解:@RabbitListener(queues = "aiguigu.news"),表示监听aigui.news这个队列的内容。
*
* @param book
*/
@RabbitListener(queues = "aiguigu.news")
@Override
public void receive(Book book) {
System.out.println("收到aiguigu.news消息:" + book);
}
/**
* 接收消息的第二种方式:
*
* @param message
*/
@RabbitListener(queues = "aiguigu")
@Override
public void receive(Message message) {
//获取消息体
byte[] body = message.getBody();
System.out.println(body); //[B@fe4bdc2
//获得消息属性
MessageProperties properties = message.getMessageProperties();
System.out.println(properties);
/*
MessageProperties [headers={__TypeId__=com.example.bean.Book}, contentType=application/json,
contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=aiguigu, deliveryTag=1,
consumerTag=amq.ctag-ynkD05MwnffSCo9h7W5DGA, consumerQueue=aiguigu]
*/
}
}
实现的效果,当给某个exchange发送消息的之后,exchange按照binding规则将消息分发给对应的队列,使用 @RabbitListener可以监听到这个队列的消息,就可以获取消息进行相应的操作。