所谓的消费方消息确认就是签收模式ack,Rabbitmq默认开启的是自动签收模式,也就是消费者监听到消息到达,就会自动发送ack给队列,告诉队列这条消息可以删除了,这种自动签收的模式存在消息丢失的可能,出现异常的话这条消息就丢了,要保证消息不会丢失,还是建议开启手动签收的模式。
一、三种签收模式
public enum AcknowledgeMode {
//自动确认
NONE,
//手动确认
MANUAL,
//根据情况确认
AUTO;
private AcknowledgeMode() {
}
public boolean isTransactionAllowed() {
return this == AUTO || this == MANUAL;
}
public boolean isAutoAck() {
return this == NONE;
}
public boolean isManual() {
return this == MANUAL;
}
}
二、配置文件开启手动签收模式
spring:
rabbitmq:
host: 192.168.31.70
port: 5672
username: guest
password: guest
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
mandatory: true
#消费端
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
三、消费方手动签收
@Component
@Slf4j
public class MessageHandler {
/**
* 邮件发送
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.email")
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主动异常
int m=1/0;
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
//异常,ture 重新入队,或者false,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
四、channel接口的实现类
里面有三个手动签收的方法
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
private final Log logger;
private final Channel delegate;
private final ConcurrentMap<String, Listener> listeners;
private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms;
private final Map<String, PendingConfirm> pendingReturns;
private final SortedMap<Long, Listener> listenerForSeq;
private final ExecutorService executor;
private volatile Consumer<Channel> afterAckCallback;
//......省略
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.delegate.basicAck(deliveryTag, multiple);
}
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.delegate.basicNack(deliveryTag, multiple, requeue);
}
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
this.delegate.basicReject(deliveryTag, requeue);
}
//.......省略
}
4.1、三个方法区别
- basicAck 同意签收 支持批量,设置入参mutiple为true
- basicReject 拒绝签收,不支持批量,支持是否重新入队,设置入参requeue为true
- basicNack 拒绝签收,支持批量,支持是否重新入队,设置入参requeue为true