官网:https://www.rabbitmq.com/
RabbitMQ is the most widely deployed open source message broker.
RabbitMQ是最广泛部署开源的消息中间件。
Spring-Boot项目引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml配置MQ信息:
#RabbitMQ
spring.rabbitmq:
host: localhost
port: 5672
#username: admin
#password: 123456
# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
# 开启ACK
listener.direct.acknowledge-mode: manual
listener.simple.acknowledge-mode: manual
配置类RabbitConfig声明队列queue1:
/**
* RabbitMQ配置类
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE_FIRST = "queue1"; //队列名
/**
* 声明队列
* @return
*/
@Bean
public Queue queueFirst() {
return new Queue(QUEUE_FIRST);
}
}
Work Queues(工作模式)
生产者把消息直接发送到队列中,多个消费者绑定一个队列进行竞争消费。谁抢到谁执行.实用场景:秒杀业务 抢红包等
生产者发送消息:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试发送消息到队列
*/
@Test
public void sendToQueue(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_FIRST, "你好,这是消息hello"+ i);
}
}
}
在另外一个项目里,2个消费者来接受消息:@RabbitListener注解,监听队列
@Component
public class Receiver {
/**
* 处理消息
* @param content
* @throws IOException
*/
@RabbitListener(queues = RabbitConfig.QUEUE_FIRST) //监听队列
public void processMessage1(String content, Channel channel, Message message) throws IOException {
try {
System.out.println("消费者1收到消息:" + content);
//手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
//抛弃此条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (Exception e) {
e.printStackTrace();
//重新放入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
@RabbitListener(queues = RabbitConfig.QUEUE_FIRST)
public void processMessage2(String content, Channel channel, Message message) throws IOException {
System.out.println("消费者2收到消息:" + content);
try {
//手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
//抛弃此条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (Exception e) {
e.printStackTrace();
//重新放入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
运行结果:多个消费者监听同一个队列,消息会均匀地发送给消费者
Publish/Subscribe(发布订阅模式)
生产者把消息发布到交换机,交换机将消息发给N个队列,消费者绑定响应队列取消息即可,此功能比较适合将某单一系统的简单业务数据消息广播给所有接口
应用场景:邮件群发,群聊天,广告
配置类RabbitConfig新增代码:声明队列queue2,fanout交换机,绑定交换机和队列:
public static final String QUEUE_SECOND = "queue2"; //队列名
@Bean
public Queue queueSecond() {
return new Queue(QUEUE_SECOND);
}
/**
* 声明fanout交换机
* @return
*/
@Bean
public Exchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 队列绑定fanout交换机,不需要路由键(路由键会忽略)
* @param queueFirst
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueFirst, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFirst).to(fanoutExchange);
}
@Bean
Binding bindingExchangeMessage1(Queue queueSecond, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueSecond).to(fanoutExchange);
}
生产者发送消息:
/**
* 测试发送消息到faout交换机
*/
@Test
public void sendFaout(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("fanoutExchange","", "发布订阅模式发消息:" + i);
}
}
把消费者2改为监听队列queue2,消费者1不用改,还是监听队列queue1
@RabbitListener(queues = RabbitConfig.QUEUE_SECOND)
public void processMessage2(String content, Channel channel, Message message) throws IOException {
System.out.println("消费者2收到消息:" + content);
try {
//手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
//抛弃此条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (Exception e) {
e.printStackTrace();
//重新放入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
运行结果:绑定此交换机的队列都收到了一样的消息
Routing(路由模式)
如果路由键完全匹配的话,消息才会被投放到相应的队列.amq.direct是rabbitMQ默认的持久化的交换机.
由于主题模式包含了路由模式,而且工作中基本用主题模式,就是交换机类型不一样,主题模式的路由规则更灵活。路由模式例子就不写了
Topics(主题模式)
模糊匹配,设置模糊的绑定方式,"*"操作符将"."视为分隔符,匹配单个单词;"#"操作符没有分块的概念,它将任意"."均视为关键字的匹配部分,能够匹配多个字符.
配置类RabbitConfig新增代码:声明topic交换机,绑定交换机和队列,队列queue1路由规则为topic.*,queue2路由规则为topic.#:
/**
* 声明topic交换机
* @return
*/
@Bean
public Exchange topicExchange() {
return new TopicExchange("topicExchange");
}
/**
* 队列绑定topic交换机
* @param queueFirst 队列Bean
* @param topicExchange 交换机Bean
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueFirst, TopicExchange topicExchange) {
return BindingBuilder.bind(queueFirst).to(topicExchange).with("topic.*");
}
@Bean
Binding bindingExchangeMessage1(Queue queueSecond, TopicExchange topicExchange) {
return BindingBuilder.bind(queueSecond).to(topicExchange).with("topic.#");
}
和原来一样,消费者1监听队列queue1,消费者2监听queue2
发送消息:
/**
* 测试发送消息到topic交换机
*/
@Test
public void sendTopic(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("topicExchange","topic.msg.消息", "主题消息:你好" + i);
}
}
运行结果:发送消息的路由键为topuc.msg.消息,只能匹配上队列queue2,路由规则topic.#
发送消息的路由键改为:topic.msg
/**
* 测试发送消息到topic交换机
*/
@Test
public void sendTopic(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("topicExchange","topic.msg", "主题消息:你好" + i);
}
}
运行结果:发送消息的路由键为topic.msg,两个队列的路由都匹配上了。