/**
* RabbitMQ消息确认机制
* 关于rabbit的生产和消费方的一些实用的操作;
* producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失
*/
/** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信; * 此时插入mq消息的服务为了保证给所有用户发消息,并且要在短时间内插入完成(因此用到了异步插入方式(快速)), * 我们就需要知道每次插入mq是否成功,如果不成功那我们可以收集失败的信息后补发(因此confirm模式排上了用场); * 开启confirm模式后,返回send结果(成功或失败) */ public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback){ CachingConnectionFactory connectionFactory = null; return getRabbitTemplate(connectionFactory, confirmCallback); } //producer生产 - confirm模式 public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory,RabbitTemplate.ConfirmCallback confirmCallback){ RabbitTemplate template = new RabbitTemplate(connectionFactory); //product开启confirm模式 connectionFactory.setPublisherConfirms(true); //设置confirm回调处理 template.setConfirmCallback(confirmCallback); return template; }
/** * consumer的ack模式 * 场景描述:短信服务去消费mq队列信息时,倘若服务调用的运营商发送短信接口异常了(短信运营商接口欠费), * 我们此时的短信是发送失败的,用户也收不到短信,但是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外); * 想要mq消息在业务逻辑异常时还存在,那么可以使用ack方式; * 业务无异常,发送ack标识,mq消息释放 * * 在springboot中可以使用基于amqp封装的工厂类关闭自动ack模式,改为手动ack方式; * 只有当业务代码流程走完后,最后通过代码设置ack标识,来通知rabbit消息可以丢弃了; * 如果设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在无法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中): */ public SimpleRabbitListenerContainerFactory listenerContainerFactory(){ ConnectionFactory connectionFactory = null; return listenerContainerFactory(connectionFactory); } public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //代码手动ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启消费者数量 factory.setConcurrentConsumers(2); //手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量 //每次接受数据量,默认250 factory.setPrefetchCount(300); return factory; } /** * 消息确认–ACK * 通过连接工厂设置手动ack方式,然后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,如下: */ private void firstNodeListener(String msg,Channel channel,Message message){ try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("firstNodeListener - 消费消息 [" + deliveryTag + "] - " + msg); //这里ack主要根据mq消息的唯一编号(deliverTag)来通知; //如果我们不设置ack确认,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中 //忘记通过basicAck返回确认信息,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息 channel.basicAck(deliveryTag, true); } catch (IOException e) { e.printStackTrace(); } }