关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面,
1、rabbitmq的消息确认机制;
2、rabbitmq的延时队列,也称作为死信队列的一些研究心得分享
为什么会产生消息确认这个概念呢?其实rabbitmq的模式是我们熟悉的典型的观察者模式的具体实现,或者说是监听器模式可以,生产者往队列投递了一条消息,消费者从队列取出消息消费,这是很好理解的;
但是在rabbitmq中引入了exchange,即交换机这个概念,我们可以理解为一个消息的中转站或者是消息的分发集散中心,在这里,rabbitmq相比kafka或者activemq提供了更为高级的功能,就是支持消息的精确路由,消息的模糊匹配等功能,这样一来,对于整个消息从生产者到消费者最终消费到这条消息,中间的链路比起单纯的链路,生产者 —>队列—>消费者,中间多了一些环节,这也就造成了消息能否最终发送并被消费成功的不确定性,正是这种不确定性使得我们在使用的时候会关注消息到每一步的时候的状态,也就产生了消息的确认机制;
下面,我们先看一张关于rabbitmq从生产者发送消息到exchange然后到指定队列的整个流程示意图,如下所示,
通过这张示意图,相信大家可以大致了解了上述解释的意思所在,也可以看出来,消息需要确认的地方无非有3处,
1、消息是否能找到对应的exchange,即生产者的消息是否能够准确投递到指定的exchange中,如果找不到,则会被退回;
2、消息投递到exhange成功,但是没有找到合适的队列,即消息无法被路由到指定的queue中去,导致消息无法被投递和消费,也会被退回;
3、最后,消息被某个消费者消费,但是没有确认
退回这个词可以认为是程序中处理未被确认的消息的一致机制,或者说一种处理方式,在rabbitmq中可以是退回这条未被确认的消息,或者是丢弃掉可以根据业务场景具体使用;
既然我们清楚了消息需要确认的地方,下面我们通过代码来模拟一下这个场景,加深理解一下其内涵,
为演示和使用方便,这里我们使用springboot整合rabbitmq做项目演示,项目结构大家可以自己指定,网上关于springboot整合rabbitmq的demo也很多,这里我主要贴上关键代码,大家也可以参考我之前的关于springboot整合rabbitmq的案例,
首先贴上配置文件,配置文件里的参数都有注释,
server.port=8082
rabbitmq的相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=2000ms
生产者确认消息 confirmListener
spring.rabbitmq.publisher-confirms=true
消息未被消费则原封不动返回,不被处理 returnListener 和 mandatory 配合使用
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-mandatory=true
定义消费者最多同时消费10个消息
spring.rabbitmq.listener.simple.prefetch=10
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
设置手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
支持消息重试
spring.rabbitmq.listener.simple.retry.enabled=true
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
1、从controller层开始,模拟发送一条对象消息,
/**
* 发送对象消息
* @return
*/
@GetMapping("/sendEmployeeMessage")
@ResponseBody
public String sendEmployeeMessage(){
Employee employee = new Employee();
employee.setAge(23);
employee.setEmpno("007");
employee.setName("jike");
messageProducer.sendMessage(employee);
return "success";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
2、我们来看 上面的sendMessage 这个方法,在这个方法里面主要做了两件事,一个是发送对象消息,然后就是在发送过程中添加了消息确认的回调函数,要注意的是这里的回调函数目前跟消费者的通道是没有任何关系的,即消息最终能否成功发送到exchange上以及exchange能否将消息路由到指定的队列,
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
//消息确认机制,如果消息已经发出,但是rabbitmq并没有回应或者是拒绝接收消息了呢?就可以通过回调函数的方式将原因打印出来
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean isack, String cause) {
System.out.println("本次消息的唯一标识是:" + correlationData);
System.out.println("是否存在消息拒绝接收?" + isack);
if(isack == false){
System.out.println("消息拒绝接收的原因是:" + cause);
}else{
System.out.println("消息发送成功");
}
}
};
//有关消息被退回来的具体描述消息
RabbitTemplate.ReturnCallback returnCallback = new ReturnCallback() {
@Override
public void returnedMessage(Message message,
int replyCode,
String desc,
String exchangeName,
String routeKey) {
System.out.println("err code :" + replyCode);
System.out.println("错误消息的描述 :" + desc);
System.out.println("错误的交换机是 :" + exchangeName);
System.out.println("错误的路右键是 :" + routeKey);
}
};
//发送对象消息时
/**
* CorrelationData 标识消息唯一性的主体对象,可以自己设定相关的参数,方便后续对某条消息做精确的定位
* confirmCallback 消息投递到rabbitmq是否成功的回调函数,如果不成功,我们可以在该回调函数中做相关的处理
* returnCallback 消息被退回的回调函数
* @param employee
*/
public void sendMessage(Employee employee){
CorrelationData cData = new CorrelationData(employee.getEmpno() + "-" + new Date().getTime());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("springboot-exchange", "hr.employee",employee,cData);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
3、消费者一端代码,实际开发中,消费端的项目可能会在其他的工程中,这个并不会影响使用,
@Component
public class HandlerOrderMessage {
/**
* 单纯接收map的类型的消息
* @param message
*/
/*@RabbitListener(queues="java_queue")
@RabbitHandler
public void handleOrder(Map<String, Object> message){
System.out.println("收到了订单消息 :" + message.get("name"));
}*/
@RabbitListener(queues="java_queue")
@RabbitHandler
public void handleEmployeeMsg(@Payload Employee employee,Channel channel,
@Headers Map<String, Object> headers){
System.out.println("消费者开始接收员工消息 =================");
System.out.println(employee.getName());
Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
4、在我们启动项目之前,还有一个很重要的配置,想必大家也很快就想到了,就是config的配置文件,在整合rabbitmq中,我们需要提前准备一个bean的class,用于在项目启动时候初始化相关的队列以及设置队列和exchange进行绑定的相关代码,这里为了模拟出效果,我们先不提供这个配置类,而且在rabbitmq控制台也不提前创建队列看看会有什么效果呢?启动项目后,浏览器输入,
http://localhost:8082/map/sendEmployeeMessage,我们看一下控制台的输出,
可以肯定的是这条消息发送失败了,失败的原因是什么呢?我们在看看后面的日志,意思是在/test这个虚拟的virtualhost下面没有找到这个交换机
从这里我们可以印证示意图中所说的第一点,然后我们手动创建上这个exchange,但是并不做springboot-exchange和队列的绑定,然后再次访问,
http://localhost:8082/map/sendEmployeeMessage,看看控制台的答应结果,
这里消息走到了returnCallback 这个回调函数里面,意思就是消息被退回来了,按照上面的分析就是消息发送到了exchange,但是exchange没有找到合适的队列进行投递,因此被退回了,注意的是
"发送成功"是消息发送到exchange这个里面发送成功了,这个发送成功的回到函数是,confirmCallback,而是否能够发送到队列成功的回调函数是,returnCallback,注意区分开
基本上到这里,就把消息确认机制的基本原理讲完了,实际工作中,我们可以继续进行后续的处理,比如消息发送失败了该如何处理,如何第一时间反馈到开发人员进行问题的排查,都可以在回调函数里面做一些处理的,
接下来说说第二个问题,就是rabbitmq比较特殊的一种队列,叫做死信队列或者说延时队列,顾名思义,就是对于那些超时未消费的消息,或者是业务的需要处理一些需要延时消费的消息的一种特殊处理机制,
延时队列的使用场景:
1.订单业务:在电商中,用户下单后30分钟后未付款则取消订单。
2.短信通知:用户下单并付款后,1分钟后发短信给用户。
延时队列实现思路
特性一:Time To Live(TTL)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
特性二:Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
队列出现dead letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
死信队列 听上去像 消息“死”了 其实也有点这个意思,死信队列 是 当消息在一个队列 因为下列原因:
- 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
- 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
- 队列超载
变成了 “死信” 后 被重新投递(publish)到另一个Exchange 该Exchange 就是DLX 然后该Exchange 根据绑定规则 转发到对应的 队列上 监听该队列 就可以重新消费 说白了 就是 没有被消费的消息 换个地方重新被消费
下面我们模拟一个死信队列的应用场景 消息延时处理
1、死信队列的相关配置类,
package com.acong.rabbitconfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.AMQP.Exchange;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
/**
- 死信队列相关配置
- @author asus
*/
@Configuration
public class DeadQueueConfig {
/**
* 死信队列 交换机标识符
*/
private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性.
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
// return (DirectExchange)
// ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
return new DirectExchange("DL_EXCHANGE", true, false);
}
/**
* 声明一个死信队列. x-dead-letter-exchange 对应 死信交换机 x-dead-letter-routing-key 对应
* 死信队列
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
// x-dead-letter-routing-key 声明 死信路由键
args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
/**
* 定义死信队列转发队列.
*
* @return the queue
*/
@Bean("redirectQueue")
public Queue redirectQueue() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
/**
* 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
*
* @return the binding
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
/**
* 死信路由通过 KEY_R 绑定键绑定到死信队列上.
*
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
说明:
deadLetterExchange()声明了一个Direct 类型的Exchange (死信队列跟交换机没有关系)
deadLetterQueue() 声明了一个队列 这个队列 跟前面我们声明的队列不一样 注入了 Map<String,Object> 参数 下面的概念非常重要
x-dead-letter-exchange 来标识一个交换机 x-dead-letter-routing-key 来标识一个绑定键(RoutingKey) 这个绑定键 是分配给 标识的交换机的 如果没有特殊指定 声明队列的原routingkey , 如果有队列通过此绑定键 绑定到交换机 那么死信会被该交换机转发到 该队列上 通过监听 可对消息进行消费
可以打个比方 这个是为主力队员 设置了一个替补 如果主力 “死”了 他的活 替补接手 这样更好理解
deadLetterBinding() 对这个带参队列 进行了 和交换机的规则绑定 等下 消费者 先把消息通过交换机投递到该队列中去 然后制造条件发生“死信”
redirectBinding() 我们需要给标识的交换机 以及对其指定的routingkey 来绑定一个所谓的“替补”队列 用来监听
流程具体是 消息投递到 DL_QUEUE 10秒后消息过期 生成死信 然后转发到 REDIRECT_QUEUE 通过对其的监听 来消费消息
2、SendController 增加消费发送接口
@RequestMapping("/dead")
@Controller
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试死信队列. http://localhost:8082/dead/deadLetter?p=11234
*/
@RequestMapping("/deadLetter")
@ResponseBody
public ResponseEntity deadLetter(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 声明消息处理器 这个对消息进行处理 可以设置一些参数 对消息进行一些定制化处理 我们这里 来设置消息的编码 以及消息的过期时间
// 因为在.net 以及其他版本过期时间不一致 这里的时间毫秒值 为字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间10*1000毫秒
messageProperties.setExpiration("10000");
return message;
};
// 向DL_QUEUE 发送消息 10*1000毫秒后过期 形成死信,具体的时间可以根据自己的业务指定
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
return ResponseEntity.ok();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
3、监听死信的替补队列,REDIRECT_QUEUE ,即死信路由到的队列,
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
/**
* 监听替补队列 来验证死信.
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception 这里异常需要处理
*/
@RabbitListener(queues = "REDIRECT_QUEUE")
@RabbitHandler
public void redirect(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("dead message 10s 后 消费消息 :" + new String (message.getBody()));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
下面启动项目,浏览器输入,
http://localhost:8082/dead/deadLetter?p=11234,
可以看到大概等了10秒,消费者收到了死信的消息,
这也验证了我们上面对于死信队列的解释说明,我这里使用的是DLX的方式实现的,大家也可以思考一下驶入使用TTL的方式实现,
基本上到这里,本篇的整合就结束了,希望对看到的小伙伴有所帮助,大家也可以在此基础上进行更加深入的研究和探讨,最后感谢观看!
附上源码下载地址:https://download.csdn.net/download/zhangcongyi420/11186779
</div>
<link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-e9f16cbbc2.css" rel="stylesheet">
</div>