RabbitMQ学习总结
一、RabbitMQ介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景:
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ呢?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
二、使用MQ的好处
2.1实现异步处理
同步的通信:发出一个调用请求之后,在没有得到结果之前,就不返回。由调用者主动等待这个调用的结果。
异步通信:调用在发出之后,这个调用就直接返回了,所以没有返回结果。也就是说,当一个异步过程调用发出后,调用者不会马上得到结果。而是在调用发出后,
被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
2.2实现解耦

// 伪代码 public void returnGoods(){
stockService.updateInventory ();
payService.refund(); noticeService.notice();

2.3实现流量削锋
三、 RabbitMQ 中的概念模型
MQ的本质:消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息队列模型, 可以在分布式环境下扩展进程的通信
MQ的特点:
1、 是一个独立运行的服务。生产者发送消息,消费者接收消费,需要先跟服务器建立连接。

消息模型:
所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,
最后将消息发送到监听的消费者。
RabbitMQ的基本概念
下图是RabbitMQ的基本结构:
组成部分说明如下:
- Broker :消息队列服务,此进程包括两个部分:Exchange和Queue。
- Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。队列使用绑定键(Binding Key)跟交换机建立绑定关系。
- Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方,它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Connection:无论是生产者发送消息,还是消费者接收消息,都必须跟Broker之间建立一个连接,这个是TCP长连接
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Virtual Host理解如下图:
相关名词:
包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
四、 下载安装
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open
Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需
要安装Erlang/OTP,并保持版本匹配,如下图:
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
1)下载erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理员方式运行此文件,安装。
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:Program Fileserl9.3 在path中添
加%ERLANG_HOME%in;
2)安装RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理员方式运行此文件,安装
3)启动
- 安装成功后会自动创建RabbitMQ服务并且启动。
从开始菜单启动RabbitMQ,完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动
2.如果没有开始菜单则进入安装目录下sbin目录手动启动:
1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3) 注意事项:
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang
搜索RabbitMQ、ErlSrv,将对应的项全部删除。
五、java操作队列
1、消息队列RabbitMQ的五种形式队列
1).点对点(简单)的队列
2).工作(公平性)队列模式
3.发布订阅模式
4.路由模式Routing
5.通配符模式Topics
2、简单队列
1)功能:一个生产者P发送消息到队列Q,一个消费者C接收
P表示为生产者 、C表示为消费者 红色表示队列。
点对点模式分析:
Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
封装Connection:
/** * 封装Connection */ public class MQConnectionUtils { public static Connection getConnection(){ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务器地址 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); //设置用户名 factory.setUsername("guest"); //设置密码 factory.setPassword("guest"); //设置vhost factory.setVirtualHost("/admin_yehui"); try { //创建连接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
参数详解:


生产者:
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接,是否排他性队列。排他性队列只能在声明它的 Connection中使用(可以在同一个 Connection 的不同的 channel 中使用),
连接断开时自动删除。
* 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); String msg = "test_yehui_rabbitmq"; /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送体:"+msg); channel.close(); connection.close(); } }
消费者:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //定义消费方法 DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }
3、消息队列RabbitMQ应答模式
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。 如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。 消息应答是默认打开的。
我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。
如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
4、工作队列
work queues与简单队列相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
P表示为生产者 、C表示为消费者 红色表示队列。
工作队列分析
均摊消费
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
测试:
1、使用简单队列,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ的公平转发
目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。
并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。 为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。 换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,
也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。 通过 设置channel.basicQos(1);
生产者
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 for(int i=0;i<10;i++){ String msg = "test_yehui_rabbitmq"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.println("消息发送完毕"); channel.close(); connection.close(); } }
消费者1:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明队列 channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 //定义消费方法 DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { //睡眠1s Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者2
public class Consumer02 { //队列名称 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
结果;
消费者1比消费者2消费得少
5、RabbitMQ交换机的作用
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,
这和我们之前学习Nginx有点类似。 交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的,队列与直连类型的交换机绑定,需指定一个精确的绑定键,

Fanout exchange(广播交换机)主题类型的交换机与队列绑定时,不需要指定绑定键。因此生产者发送消息到广播类型的交换机上,也不需要携带路由键。消息达到交换机时,所有与之绑定了的队列,都会收到相同的消息的副本。
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列,

p是生产者 X是交换机 C1 、C2 是消费者
6、发布/订阅模式Publish/Subscribe
基本概念:
这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
思路解读(重点理解):
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都绑定到交换机上
(5)消息通过交换机到达每个消费者的队列 该模式就是Fanout Exchange(广播交换机)将消息路由给绑定到它身上的所有队列 以用户发邮件案例讲解
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
工作原理图:
生产者:
public class ProducerFanout { //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel();
//声明交换机
/** *
参数1:交换机名称
* 参数2:交换机类型 */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发送消息 /** * 参数1:交换机名称 * 参数2:路由key * 参数3:消息属性参数 * 参数4:消息实体 */
channel.basicPublish(EXCHANGE_NAME,"",null,"fanout_exchange_msg".getBytes());
channel.close();
connection.close();
}
}
邮件消费者
public class ConsumerEmailFanout { //邮件队列 private static final String EMAIL_QUEUE = "email_queue"; //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("邮件消费者"); //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel();
//声明一个队列 channel.queueDeclare(EMAIL_QUEUE,false,false,false,null); //绑定交换机 // 4.消费者绑定交换机 参数1 队列 名称 参数2交换机名称 参数3 routingKey channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; //消费者监听队列消息 channel.basicConsume(EMAIL_QUEUE,true,consumer); } }
短信消费者
public class ConsumerSMSFanout { //短信队列 private static final String SMS_QUEUE = "sms_queue"; //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("短信消费者"); //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明一个队列 channel.queueDeclare(SMS_QUEUE,false,false,false,null); //绑定交换机 // 4.消费者绑定交换机 参数1 队列 名称 参数2交换机名称 参数3 routingKey channel.queueBind(SMS_QUEUE,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; //消费者监听队列消息 channel.basicConsume(SMS_QUEUE,true,consumer); } }
7、路由模式RoutingKey
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
生产者:
/** * 生产者 */ public class DirctProduct { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 //3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare("dirctExchange","direct"); //声明队列 channel.queueDeclare("emailDirectQueue",true,false,false,null); channel.queueDeclare("smsDirectQueue",true,false,false,null); //绑定交换机 //交换机和队列进行绑定 /** * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:路由key */ channel.queueBind("emailDirectQueue","dirctExchange","emailRoutKey"); channel.queueBind("smsDirectQueue","dirctExchange","smsRoutKey"); //发送消息 channel.basicPublish("dirctExchange","emailRoutKey",null,"Email邮件发送".getBytes()); channel.basicPublish("dirctExchange","smsRoutKey",null,"Sms发送发送".getBytes()); channel.close(); connection.close(); } }
邮件消费者
/** * 邮件消费者 */ public class EamilConsomer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("emailDirectQueue",true,consumer); } }
短信消费者
/** * 短信消费者 */ public class SmsConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("smsDirectQueue",true,consumer); } }
8、通配符模式Topics
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配,由交换机根据routingkey来转发消息到指定的队列。
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
生产者:
public class ProducerTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("topicExchange","topic"); //声明队列 channel.queueDeclare("emailQueueTopic",true,false,false,null); channel.queueDeclare("smsQueueTopic",true,false,false,null); //绑定 channel.queueBind("emailQueueTopic","topicExchange","email.*"); channel.queueBind("smsQueueTopic","topicExchange","sms.*"); //发送消息 channel.basicPublish("topicExchange","email.log",null,"email邮件".getBytes()); channel.basicPublish("topicExchange","sms.log",null,"sms邮件".getBytes()); channel.close(); connection.close(); } }
邮件消费者:
public class EmailTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println("邮件队列:"+new String(body)); } }; channel.basicConsume("emailQueueTopic",true,consumer); } }
短信消费者:
public class SmsTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println("短信队列:"+new String(body)); } }; channel.basicConsume("smsQueueTopic",true,consumer); } }
9、SpringBoot整合RabbitMQ
生产者:
maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>2.0.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- fastjson 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> </dependencies>
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui
定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。
案例是用的是fanout交换机类型
@Configuration public class RabbitMQConfig { // 邮件队列 public static String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue"; // 短信队列 public static String FANOUT_SMS_QUEUE = "fanout_sms_queue"; //交换机 public static String EXCHANGE_NAME = "fanoutExchange"; //定义邮件队列 @Bean("fanoutEamilQueue") public Queue fanoutEamilQueue(){ return new Queue(FANOUT_EMAIL_QUEUE); } //定义短信队列 @Bean("fanoutSmsQueue") public Queue fanoutSmsQueue(){ return new Queue(FANOUT_SMS_QUEUE); } //定义交换机 @Bean("fanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE_NAME); } //将邮件队列绑定交换机 @Bean("bindingEmailExchange") public Binding bindingEmailExchange(@Qualifier("fanoutEamilQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //将邮件队列绑定交换机 @Bean("bindingSmsExchange") public Binding bindingSmsExchange(@Qualifier("fanoutSmsQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
生产者投递消息
/** * 发送消息类 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EMAIL_QUEUE,msg); } }
控制层调用代码
@RestController public class RabbitController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/index") public String index(){ fanoutProducer.send("邮件消息"); fanoutProducer.send("短信消息"); return "index"; } }
消费者
maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>2.0.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- fastjson 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> </dependencies>
application.yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui server: port: 8081
监听消息
/** * 监听队列 */ @Component public class ReceiveHandler { /** * 邮箱 * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("邮箱消费者获取生产者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消费者获取生产者消息msg:" + msg); } }
10、消息确认机制
问题产生背景: 生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,
后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
解决方案: 1.AMQP 事务机制
2.Confirm 模式
1.事务模式:
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()
txSelect用于将当前channel设置成transaction模式
txCommit用于提交事务
txRollback用于回滚事务
在事务模式里面,只有收到了服务端的 Commit-OK 的指令,才能提交成功。所以
可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消
息没有发送完毕,不能发送下一条消息,它会榨干 RabbitMQ 服务器的性能。所以不建
议大家在生产环境使用。
rabbitMq api实现
springboot 整合RabbitMq 实现
/** * 发送消息类 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { //在springboot设置事务 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("faoutEmailQueue1",i); } return "success"; } } @Component public class ReceiveHandler { /** * 邮箱 * @param */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(String i) throws IOException { System.out.println(i); } }
使用抓包工具查看没有使用事务时:
使用抓包工具查看使用事务时:
2.生产者确认Confirm 模式
RabbitTemplate模板类
@Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //spring.rabbitmq.publisher-confirms=true 开启确认模式, // 可以在application.properties,也可以根据下面的代码配置 connectionFactory.setPublisherConfirms(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) {// 如果发送交换机成功,但是没有匹配路由到指定的队列, 这个时候ack返回是true(这是一个坑) System.out.println("生产者ACK成功:" + correlationData.getId()); } else { // 失败 启动重试 // 强制转换 CorrelationDataDo errorCorrelationData = (CorrelationDataDo) correlationData; System.out.println(errorCorrelationData);// 注意: 不能调用rabbitTemplate发送,会导致线程死锁 //rabbitTemplate.convertAndSend(); // 解决办法 errorCorrelationData放入缓存. 让定时任务轮询发送. Map errorMap = new HashMap(); errorMap.put("status", "-2");// ack失败 errorMap.put("errorMsg", cause); errorMap.put("errorCorrelationData", errorCorrelationData); redisTemplate.boundHashOps("orderMessageStatus").put(errorCorrelationData.getId(), errorMap); } } }); return rabbitTemplate; }
注意事项: 但是在主线程发送消息的过程中,rabbitMQ服务器关闭,这时候主程序和 ConfirmCallback 线程都
会等待Connection恢复,然后重新启动 rabbitmq ,当应用程序重新建立 connection 之后,两个线程都会死锁
扩展CorrelationDataDo类
@Data public class CorrelationDataDo extends CorrelationData { // 消息 private Object message; // 交换机 private String exchange; // 路由键 private String routingKey; // 重试次数 private int retryCount = 0; }
/** * 发送消息类 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//设置发送消息 .setCorrelationId(uuid)//设置全局ID .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化 .setContentType("application/json")//设置格式application/json .build(); //CorrelationData 用于消息确认 CorrelationDataDo继承了CorrelationData用于扩展属性 CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 邮箱 * @param */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 创建json消息转换器 Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //获取消息 Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
3.消费者消息确认
3.1消费json格式数据
// 创建json消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 获取消息 Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); //1.获取消息 Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣减库存 int result = 0; result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改库存成功"); // 清空redis缓存数据 redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getC orrelationId()); } else { System.out.println("修改库存失败,人工处理"); }
3.2消息确认方式
基于配置:
在消费者application.properties文件中设置消息确认方式
# 表示消息确认方式,none manual 手动ack 和auto 自动ack ;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=auto
none: autoAck=true 自动ack,不管监听是否发生错误都返回ack
manual: 手动ack, 用户手动设置ack或者nack
auto: 根据监听容器反会ack或者nack,如果容器抛出异常则自动启动重试机制.
auto模式:
# 表示消息确认方式,none manual 手动ack 和auto 自动ack ;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=auto
manual 模式
# 表示消息确认方式,none manual 手动ack 和auto 自动ack ;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=manual
配置在config类里面
@Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //签收模式 AcknowledgeMode.MANUAL手动 AcknowledgeMode.AUTO自动,AcknowledgeMode.NONE // none: autoAck=true 自动ack,不管监听是否发生错误都返回ack factory.setAcknowledgeMode(AcknowledgeMode.NONE); return factory; }
消费者代码
@Component public class SeckillOrder_Consumer { @Autowired TbSeckillGoodsMapper seckillGoodsMapper; @Autowired RedisTemplate redisTemplate; //containerFactory 这个就是不配置在配置文件里面,而是写在config配置类里面的引入的config类 @RabbitListener(queues = {"seckillOrder_queue"},containerFactory = "mqConsumerlistenerContainer") public void receive(Message message, Channel channel) throws IOException { // 创建json消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); // 解决幂等性问题 Object orderMessage = redisTemplate.boundHashOps("orderMessage").get(message.getMessageProperties().getCorrelationId()); if (orderMessage == null) { System.out.println("已经消费了, 不在重复消费"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } //1.获取消息 Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣减库存 int result = 0; try { result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改库存成功"); //清空redis缓存数据 redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getCorrelationId()); // 手动确认ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("修改库存失败,人工处理"); // 将错误放入缓存 Map errorMap = new HashMap(); errorMap.put("status", "-4");//SQL执行异常 errorMap.put("errorMsg", "SQL执行的结果:" + result); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手动确认nack channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } catch (Exception e) { System.out.println("修改库存失败,人工处理"); // 手动确认nack 区分问题原因: 数据库网络连接(重回队列) 其他原因(不重回队列) // 将错误放入缓存 Map errorMap = new HashMap(); if (e instanceof MyBatisSystemException) { MyBatisSystemException myBatisSystemException = (MyBatisSystemException) e; // 获取根本原因 Throwable rootCause = myBatisSystemException.getRootCause(); if (rootCause instanceof ConnectException) { // 重试-重回队列 设置重试3次 errorMap = (Map) redisTemplate.boundHashOps("orderMessageStatus").get(message.getMessageProperties().getCorrelationId()); if (errorMap == null) { errorMap = new HashMap(); errorMap.put("retryCount", 0);//初始重试次数 } int retryCount = (int) errorMap.get("retryCount"); if (++retryCount <= 3) { // 重试 errorMap.put("status", "-5");//数据库网络原因 errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手动确认nack channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("网络原因.开启重试:" + retryCount); } else { errorMap.put("status", "-5");//数据库网络原因 errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手动确认nack channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("重试次数已用完. 不在重试..."); } } else { errorMap.put("status", "-6");//其他原因 errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手动确认nack channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回队列"); } } else { errorMap.put("status", "-6");//其他原因 errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手动确认nack channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回队列"); } } } }
config类相关的解释请参考:https://www.jianshu.com/p/090ed51006d5,https://blog.csdn.net/qq_42606051/article/details/82869148
11、限流
基于配置文件
#限流2条 spring.rabbitmq.listener.simple.prefetch=2
/** * 发送消息类 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//设置发送消息 .setCorrelationId(uuid)//设置全局ID .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化 .setContentType("application/json")//设置格式application/json .build(); //CorrelationData 用于消息确认 CorrelationDataDo继承了CorrelationData用于扩展属性 CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 邮箱 * @param */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 创建json消息转换器 Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //获取消息 Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
在消费者端debug运行结果:
运行生产者可以看到Ready=3 Unacked=2, Total=5, Total代表队列中的消息总条数,Ready代表消费者还可以读到的条数,Unacked:代表还有多少条没有被应答
基于config配置类
@Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //限流配置 factory.setPrefetchCount(2); return factory; }
消费者端代码
/** * 接收消息 */ @Component public class ReceiveHandler { /** * 邮箱 * @param */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1"), containerFactory = "mqConsumerlistenerContainer") public void receive_email(Message message) { // 创建json消息转换器 Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //获取消息 Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
参考博客代码:https://blog.csdn.net/linsongbin1/article/details/100658415,https://blog.csdn.net/vbirdbest/article/details/78699913
11、RabbitMQ消息重试机制
消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?
答案:使用消息重试机制。(演示重试机制)
如何合适选择重试机制:
情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)
情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。
如何实现重试机制 总结:
对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿
重试机制案例:
生产者代码就按照上面的案例就可以了,
消费者:
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####开启消费者重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 server: port: 8081
/** * 监听队列 */ @Component public class ReceiveHandler { /** * 邮箱 * @param msg */ //rabbitmq 默认情况下 如果消费者程序出现异常的情况下,会自动实现补偿机制
//重试机制都是间隔性的 // 补偿(重试机制) 队列服务器 发送补偿请求 // 如果消费端 程序业务逻辑出现异常消息会消费成功吗? 是不能消费者成功的 //@RabbitListener 底层 使用Aop进行拦截,如果程序没有抛出异常,自动提交事务 // 如果Aop使用异常通知拦截 获取异常信息的话,自动实现补偿机制 ,该消息会缓存到rabbitmq服务器端进行存放,一直重试到不抛异常为准。 // 修改重试机制策略 一般默认情况下 间隔5秒重试一次 @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("出现异常"); int i = 1/0; System.out.println("邮箱消费者获取生产者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消费者获取生产者消息msg:" + msg); } }
调用第三方接口重试机制分析图:
重试机制调用第三方接口 @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); // 重试机制都是间隔性 JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("email"); String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl); JSONObject result = HttpClientUtils.httpGet(emailUrl); // 如果调用第三方邮件接口无法访问,如何实现自动重试. if (result == null) { throw new Exception("调用第三方邮件服务器接口失败!"); } System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束"); // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } // 默认是自动应答模式 }
12、消费者如果保证消息幂等性,不被重复消费
产生原因:网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,
在重试过程中,可能会造成重复消费。 消费者如何保证消息幂等性,
不被重复消费 解决办法:
①使用全局MessageID判断消费方使用同一个,解决幂等性。
②或者使用业务逻辑保证唯一(比如订单号码)
基于全局消息id区分消息,解决幂等性(重复消费)
生产者:
@RequestMapping("/send") public String send(){ String msg = "my_fanout_msg:" + System.currentTimeMillis(); //设置全局ID Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); fanoutProducer.send(message); return null; }
消费者
/** * 邮箱 使用全局ID * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(Message message){ System.out.println("出现异常"); String messageId = message.getMessageProperties().getMessageId(); int i = 1/0; System.out.println("邮箱消费者获取生产者消息msg:" + messageId); }
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####开启消费者重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 server: port: 8081
启动测试,重试的时候没有发生变化
13、SpringBoot整合RabbitMQ签收模式
//邮件队列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } }
开启手动应答
pring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_host listener: simple: retry: ####开启消费者异常重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 2000 ####开启手动ack acknowledge-mode: manual
14属性设置
1.TTL(Time To Live)
1.1消息的过期时间
/** * 设置过期时间 * @return */ @Bean("ttlQueue") public Queue ttlQueue(){ Map<String,Object> map = new HashMap<>(); map.put("x-message-ttl", 11000);//队列中的消息未被消费11s后过期 return new Queue("ttlQueue",true,false,false,map); }
2)设置单条消息的过期时间
在发送消息的时候指定消息属性。
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message("这条消息4秒后过期".getBytes(), messageProperties); rabbitTemplate.send(RabbitMQConfig.FANOUT_EMAIL_QUEUE,message); // 随队列的过期属性过期,单位ms rabbitTemplate.convertSendAndReceive(RabbitMQConfig.FANOUT_EMAIL_QUEUE, "消息发送");
2.死信队列
2.1概述
死信队列 听上去像 消息“死”了 其实也有点这个意思,死信队列 是 当消息在一个队列 因为下列原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
队列超载变成了 “死信” 后 被重新投递(publish)到另一个Exchange 该Exchange 就是DLX 然后该Exchange 根据绑定规则 转发到对应的 队列上
监听该队列 就可以重新消费,说白了就是 没有被消费的消息 换个地方重新被消费
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

2.2.应用场景
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上
定义业务(普通)队列的时候指定参数
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
死信队列配置
生产者
@Component public class FanoutConfig { /** * 定义死信队列相关信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 邮件队列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信队列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交换机 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定义邮件队列 @Bean public Queue fanOutEamilQueue() { // 将普通队列绑定到死信队列交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定义短信队列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 配置死信队列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1 / timestamp; System.out.println("result:" + result); // 通知mq服务器删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); // // 丢弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } @Component public class DeadConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
消息留转图
六、消息可靠性
在 RabbitMQ 里面提供了很多保证消息可靠投递的机制,这个也是 RabbitMQ 的一个特性。
在理解消息可靠性投递的时候,必须明确一个问题,因为效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一
6.1RabbitMq的工作模型
使用 RabbitMQ 收发消息的时候,有几个主要环节:
6.2消息发送到 RabbitMQ 服务器
Transaction(事务)模式

public class TransactionProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ"; // 声明队列(默认交换机AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { channel.txSelect(); // 发送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // int i =1/0; channel.txCommit(); System.out.println("消息发送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已经回滚"); } channel.close(); conn.close(); } }
rabbitTemplate.setChannelTransacted(true);
Confirm(确认)模式
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("发送消息失败:" + cause); throw new RuntimeException("发送异常:" + cause); } } });
6.3消息在队列中的存储
@Bean("durableQueue") public Queue queue(){ return new Queue("durableQueue",true,false,false,new HashMap<>()); }
2.持久化交换机
@Bean("fanoutDurableExchange") public FanoutExchange fanoutDurableExchange(){ // exchangeName, durable, exclusive, autoDelete, return new FanoutExchange("fanoutDurableExchange",true,false,new HashMap<>()); }
3.消息持久化
MessageProperties messageProperties = new MessageProperties(); //消息持久化 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 随队列的过期属性过期,单位ms rabbitTemplate.convertAndSend("faoutEmailQueue","324243232",messageProperties);
6.4消息投递到消费者
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
public class SecondConsumer { @RabbitHandler public void process(String msgContent, Channel channel, Message message) throws IOException { System.out.println("Second Queue received msg : " + msgContent); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
如果消息无法处理或者消费失败,也有两种拒绝的方式,Basic.Reject()拒绝单条,Basic.Nack()批量拒绝。如果 requeue 参数设置为 true,可以把这条消息重新存入队列,
6.5消费者回调

6.6补偿机制
6.7消息幂等性
# 表示消息确认方式,none manual 手动ack 和auto 自动ack ;默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto