1. JMS和AMQP
- JMS(Java Message Service):
- ActiveMQ是JMS实现;
- AMQP(Advanced Message Queuing Protocol)
- 兼容JMS
- RabbitMQ是AMQP的实现
2. RabbitMQ 简介
Message
:由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成;Publisher
:一个向交换器发布消息的客户端应用程序;Exchange
:用来接收生产者发送的消息并将这些消息路由给服务器中的队列;- 有四种类型:direct(默认),fanout,topic和headers;
Queue
:用来保存消息直到发送给消费者,是消息的容器;Binding
:用于消息队列和交换器之间的关联;Connection
:网络连接,比如一个TCP连接;Channel
:多路复用连接中的一条独立的双向数据流通道;Consumer
:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序;Virtual Host
:虚拟主机,表示一批交换器,消息队列和相关对象;每个vhost本质上就是一个mini版的RabbitMQ服务器;Broker
:表示消息队列服务器实体;
3. RabbitMQ 整合(SpringBoot)
- 自动配置:
RabbitAutoConfiguration
- 自动配置了连接工厂
ConnectionFactory
; RabbitProperties
封装了RabbitMQ的配置;RabbitTemplate
:给RabbitMQ发送和接收消息;AmpqAdmin
:RabbitMQ系统管理功能组件;@EnableRabbit
:开启基于注解的RabbitMQ模式;@EnableRabbit
和@RabbitListener
用于监听消息队列的内容;
// application.properties 配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
// 测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads(){
// 点对点消息
// Message 需要自己构造一个,定义消息体内容和消息头
// rabbitTemplate.send(exchange, routeKey, message);
// rabbitTemplate.convertAndSend(exchange, routeKey, object)
// 只需要传入要发送的对象, 会自动序列化发送给rabbitmq, object 默认当成消息体
Map<String, Object> map = new HashMap<>();
map.put("msg","匆匆的我来了...");
map.put("data",Arrays.asList("777477",232,true));
rabbitTemplate.convertAndSend("exchange.direct", "atnoodles.news",map);
}
// 接收消息
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("atnoodles.news");
System.out.println(o.getClass()); // Class java.util.HashMap
System.our.println(o);
}
}
// 如果需要将发送的数据自动转换为JSON,发送出去
// com.noodles.springboot.rabbitmq.config.MyAMQPConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig{
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4. AmpqAdmin
- 创建和删除 Queue, Exchange, Binding
// 测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
// 创建 Exchange
amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
System.out.println("创建完成...");
// 创建 Queue
amqpAdmin.declareQueue(new Queue("amqpAdmin.queue", true));
}
}
参考资料: