一.消息的可靠投递
在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性
rabbitMQ 整个消息投递过程为:
producer -> rabbitMQ broker -> exchange -> queue ->consumer
1.confirm 确认模式
消息从producer 到exchange 会返回一个 confirmCallback
2.return 退回模式
消息从exchange到queue投递失败则会返回一个returnCallbak
Confirm确认模式:
1.在配置文件中开启确认模式 publisher-confimrs :true
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-confirms: true
2.在rabbitTemplate定义ConfirmCallBack回调函数
@RequestMapping("/producer") public void producer(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override /** * ack exchange交换机是否收到了消息 * cause 失败原因 */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("confirm方法 成功了"); }else{ System.out.println("confirm方法 失败了: " + cause); } } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq"); }
Return退回模式:
1.开启退回模式
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true
2.设置returnCallBack
@RequestMapping("/return") public void returncallback(){ //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); //设置return call back rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了...."); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); //处理 } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back"); }
二、Consumer ACK 确认机制
表示消费端收到消息后到确认方式
有三种确认方式:
1.自动确认 acknowledge="none" 默认
2.手动确认 acknowledge="manual"
3.根据异常情况确认: acknowledge="auto"
1.开启手动确认
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true listener: simple: acknowledge-mode: manual
2.成功调用channel.basicAck 异常调用channel.basicNack
@Component public class RabbitMQListener implements ChannelAwareBatchMessageListener { @Override @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println(new String(message.getBody())); int a = 1/0; //手动确认 channel.basicAck(deliveryTag,true); }catch (Exception ex){ //发生异常 拒绝确认 //第三个参数,重回队列 channel.basicNack(deliveryTag,true,true); } } }
三、消费端 限流
1.确保ack机制为手动确认
2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条
server: port: 9999 spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true listener: simple: acknowledge-mode: manual direct: prefetch: 1
四、TTL
Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。
1.queue 队列设置过期时间
@Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "exchange_ttl"; public static final String QUEUE_NAME = "queue_ttl"; public static final String Routing_Key = "ttl.#"; // 1.交换机 @Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2.Queue队列 @Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build(); } //3.绑定交换机和队列 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs(); } }
2.消息设置过期时间
@RequestMapping("/producer") public void producer(){ MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); // 消息过期时间 return message; } }; for(int i = 0;i<10;i++){ if(i>5){ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor); }else{ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i); } } }
五、死信队列
DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。
消息成为死信到三种情况:
1.队列消息长度达到限制,
2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。
3.原队列存在消息过期设置,消息到达超时时间未被消费
设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key
@Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "test_exchange_dlx"; public static final String QUEUE_NAME = "test_queue_dlx"; public static final String Routing_Key = "test.dlx.#"; public static final String DEAD_EXCHANGE = "exchange_dlx"; public static final String DEAD_QUEUE = "queue_dlx"; public static final String DEAD_ROUTINGKEY = "dlx.#"; //声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx) //交换机 @Bean("testExchangeDlx") public Exchange testExchangeDlx() { return new TopicExchange(EXCHANGE_NAME); } //声明队列 @Bean("testQueueDlx") public Queue testQueueDlx() { Map<String, Object> args = new HashMap<>(); //绑定 死信队列Exchange args.put("x-dead-letter-exchange", DEAD_EXCHANGE); //绑定 死信队列抛出异常重定向队列的routingKey args.put("x-dead-letter-routing-key", "dlx.hehe"); //过期时间 args.put("x-message-ttl", 5000); //最大长度 args.put("x-max-length", 10); return new Queue(QUEUE_NAME,true,false,false,args); } //绑定交换机和队列 @Bean public Binding bindQueueExchange(@Qualifier("testQueueDlx") Queue queue, @Qualifier("testExchangeDlx") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs(); } //声明死信的交换机和队列 // 死信交换机和普通交换机没有什么区别 @Bean("deadExchange") public Exchange deadExchange() { return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build(); } @Bean("deadQueue") public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } //绑定交换机和队列 @Bean public Binding deadBindQueueExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTINGKEY).noargs(); } }
1、创建交换机,队列
2、创建死信交换机,队列
3.在正常队列中绑定死信交换机 即可 超过指定条数 或者超过时间 就会自动进入 deadQueue队列
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println(new String(message.getBody())); //手动确认 int i = 1/0; // // Thread.currentThread().sleep(2000); channel.basicAck(deliveryTag,true); }catch (Exception ex){ //发生异常 拒绝确认 //第三个参数,重回队列 requeue = false channel.basicNack(deliveryTag,true,false); } }
经测试,手动拒绝 requeue=false直接进 死信队列, 超时后 未处理的条数会进入死信队列, 超过10条的直接进入死信队列。
六、延迟队列
消息进入队列不会立即消费,只有到达指定时间后才会消费。
需求 :
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
使用TTL+死信队列 实现延迟队列的效果,
队列过期时间设置为30分钟,30分钟后失效到 死信队列中, 消费dlx 死信队列 即可
七、日志与监控
rabbitMQ 日志文件存放路径: /var/log/rabbitmq/rabbit@主机名.log
八、消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于rabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了链接,而它们与RabbitMQ又采用了不同的确认机制。也有可能是因为交换机与队列之间不同的转发策略,甚至是交换机没有雨任何队列进行绑定,这个时候就需要一个较好的机制跟踪记录消息的投递过程,协助开发人员进行问题的定位。
在RabbitMQ中可以使用Firehose 和rabbitmq_tracing插件功能来实现消息追踪。
https://blog.csdn.net/u013256816/article/details/76039201
开发 测试环境 调错用, 生产环境会影响性能
九、消息可靠性保障-消息补偿
1.首先入库,然后去发送到队列,消费后入库。 完整流程
2.入库,发送消息失败了,过了10分钟发送延迟消息,监听延迟消息,将消息写入数据库,写一个服务定时检查是否成功,如果没成功 重新发送消息。
十、消息幂等性保障-乐观锁机制
不管请求一次还是多次请求结果应该是一样的。在MQ中,消费多条相同的消息,得到与消费一次相同的结果。
如果 consumer 挂了, producer 又发了一次 就会队列中 存在两次 消费500元的通知,
可以在指定版本号的方式,防止sql执行,只更新version为1的 sql
十一、集群搭建
多服务器用ip区分
单机多实例用端口区分