这两个机制都是发送端和mq服务器之间消息的确认,可以理解为生产端ack
1、confirm机制,消息的确认,是指生产者投递消息之后,如果Broker收到消息,则会给生产者一个应答,生产者能接收应答,用来确定这条消息是否正常的发送到Broker,这种机制是消息可靠性投递的核心保障。confirm机制是只保证消息到达exchange,并不保证消息可以路由到正确的queue。
2、return机制,用于处理一些不可路由的消息,在一些特殊的情况下,当前的exchange不存在或者指定的路由key路由不到,这时如果我们需要及时监听这种消息,就需要return机制。
原代码:https://www.cnblogs.com/gyjx2016/p/13622097.html
properties
server.port=8080
spring.rabbitmq.host=dev-mq.ttsingops.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=cddayuwen
spring.rabbitmq.password=cddayuwen@123
spring.rabbitmq.virtual-host=/cd
#发布者确认
spring.rabbitmq.publisher-confirm-type=correlated
#发布者到达确认
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.type=simple
#simple关闭自动ack,手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
### 开启重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试传递次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
#第一次和第二次尝试传递消息的间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000ms
#最大重试时间间隔,单位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=300000ms
#应用前一次重试间隔的乘法器,multiplier默认为1
spring.rabbitmq.listener.simple.retry.multiplier=3
#以上配置的间隔0s 5s 15s 45s
#重试次数超过上面的设置之后是否丢弃(消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机))
spring.rabbitmq.listener.simple.default-requeue-rejected=true
### 模板配置
##设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
spring.rabbitmq.template.mandatory=true
稍作改动,新加confirm和return 确认代码
@Component @Slf4j public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { /** * confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制 * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("rabbitmq confirm fail,cause:{}", cause); } } /** * Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息 * 就需要这种return机制 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey); String messageId = message.getMessageProperties().getMessageId(); } }
/** * 设置返回回调和确认回调 * * @param connectionFactory * @return */ @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,RabbitMQConfirmAndReturn rabbitMQConfirmAndReturn) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setConfirmCallback(rabbitMQConfirmAndReturn); rabbitTemplate.setReturnCallback(rabbitMQConfirmAndReturn); //Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃 rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; }
controller模拟这两个确认机制
参考文献:
https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#template-confirms
https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#cf-pub-conf-ret