知识储备:
关于消息队列的基本概念我已经在上一篇文章介绍过了(传送门),本篇文章主要讲述的是SpringBoot与RabbitMQ的整合以及简单的使用。
一.安装RabbitMQ
1.在linux上使用docker下载RabbitMQ
docker pull registry.docker-cn.com/library/rabbitmq:3-management
2.使用docker启动RabbitMQ
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq d69a5113ceae
5672端口:客户端与MQ的通信端口
15672端口:管理界面访问web页面的端口
3.访问管理界面
浏览器访问:http://172.16.**.**:15672,默认的管理界面账号密码均为:guest
测试RabbitMQ
1). 登录RabbitMQ管理界面,创建交换器(Exchanges)
2). 创建Queues
3). 分别给交换器绑定queues
4).在direct交换器中给路由器发送消息
5). 队列中接收到的消息
二.环境搭建
1.引入spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.环境配置
#配置主机地址,默认localhost spring.rabbitmq.host=172.16.80.34 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #默认5672 spring.rabbitmq.port=5672 #默认/ spring.rabbitmq.virtual-host=/
三.RabbitMQ自动配置原理
1.RabbitAutoConfiguration
2.自动配置了连接工厂ConnectionFactory
3.RabbitProperties封装了RabbitMQ的配置
4.RabbitTemplate:给RabbitMQ发送和接收消息的模板
5.AmqpAdmin系统管理组件:创建交换器等
四.RabbitTemplate的简单使用
发送消息:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
//Message需要自己构造一个,定义消息体内容和消息头
// rabbitTemplate.send(exchange,routingKey,message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化给mq
Map<String,Object> map = new HashMap<>();
map.put("msg","第一次发送消息");
map.put("data",Arrays.asList("<","0.0",">"));
//对象被默认序列化以后发送出去
rabbitTemplate.convertAndSend("exchange.direct","wang.news",map); //使用点对点方式传播
}
此时查看RabbitMQ管理页面的wang.news队列,已经有消息插入进去了,由于RabbitMQ传递的是序列化的对象,所以接收到的值也是序列化过后的值。
接收消息:
@Test public void receive(){ Object receive = rabbitTemplate.receiveAndConvert("wang.news"); //接收消息。 System.out.println(receive.getClass()); System.out.println(receive); }
使用该方法获取到消息后队列里的消息就会自动清除。
由于序列化的对象保存起来很不直观,那么该如何解决这个问题呢?
由于RabbitTemplate默认采用的是JDK的MessageConvert,使用默认的JDK序列化规则,所以需要更改MessageConvert,更改为JSON的序列化规则
import org.springframework.amqp.support.converter.MessageConverter;//这里要导入amqp包下的MessageConverter
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){ //自动配置里,配置RabbitTemplate的时候会判断是否有自定义的MessageConvert,如果有则采用自定义的
return new Jackson2JsonMessageConverter();
}
}
上面演示的是点对点(direct)的交换器(Exchanges),那么广播模式(fanout)的交换器要如何使用的呢?
@Test public void sendMsgs(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("java",2)); //广播模式只需要指定交换器的模式,自动会向该交换器绑定的所有队列发送消息。 }
发布/订阅(模糊匹配模式)也是一样的,只需要指定交换器,修改对应的routingKey就行了
/** * 发布/订阅(模糊匹配)方式 */ @Test public void topicSendMsgs(){ rabbitTemplate.convertAndSend("exchange.topic","*.news",new Book("python",3)); }
五.监听消息
上面简单演示了使用rabbitTemplate发送和接收消息,实际开发中需要一些监听场景。例如订单系统和库存系统的解耦中,两个系统之间都是通过消息队列来通信的,当某一个人下单之后,将订单信息存放在消息队列中,库存系统要实时的监听消息里面的内容一旦有新的订单进来,库存系统就需要有相关的操作。那么该如何实现监听呢,Spring为了简化开发,引入了一些注解来实现消息队列的监听。
1.在SpringBoot主启动类上加上注解@EnableRabbit,开启RabbitMQ的注解模式
@EnableRabbit//开启基于注解的RabbitMQ模式 @SpringBootApplication public class Springboot02AmqpApplication { public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); } }
2.使用@RabbitListener监听某个队列
@Service public class BookService { @RabbitListener(queues = {"wang.news"}) //监听队列wang.news,只要wang.news收到消息,立刻执行该方法,并清空队列 public void receive(Book book){ System.out.println("收到消息"+book); } @RabbitListener(queues = {"wang"}) public void receive02(Message message){ //org.springframework.amqp.core.Message; System.out.println("消息内容"+message.getBody()); System.out.println("消息头"+message.getMessageProperties()); } }
五.AMQPAdmin的使用
上面代码用到的交换器以及队列都是我们手动在RabbitMQ管理界面添加的,使用AMQPAdmin可以让我们用编码的方式创建这些组件。
1.创建交换器(Exchange)
@Test public void createExchange() { DirectExchange directExchange = new DirectExchange("amqpadmin.exchange"); amqpAdmin.declareExchange(directExchange); //创建一个DirectExchange System.out.println("创建完成"); }
2.创建队列(Queue)
@Test public void createQueue(){ amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); System.out.println("队列创建完成"); }
3.创建绑定规则(banding)
@Test public void createBanding(){ amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpadmin.queue",null)); System.out.println("绑定完成"); }