数据持久性机制
交换机(exchange)持久化
# durable=True 开启交换机 持久化机制
channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
队列(Queue)持久化
# delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange='python1',routing_key='#user#',body={"data","data1"},properties=pika.BasicProperties(delivery_mode=2))
消息可靠投递
------ TODO 这块暂时有点问题 后续弄清楚在整理一下
投递确认
confirm 确认模式
开启confirm确认模式
设置回调函数,当消息发送到exchange后回调confirm方法,在方法中判断ack,如果为true 则发送成功,如果为false则发送失败,需要处理
return 退回模式
开启 退回模式
设置回调函数,当消息从exchange路由到queue失败后,如果开启了交换机处理失败消息(setMandatory=ture),则会将消息回退给producer,并执行回调函数 若没开启 则忽略失败消息
Consumer Ack 发布者证实
ack:Acknowledge 确认。表示消费端收到消息后的确认方式
三种确认方式:
- 自动确认 acknowledge=none
- 手动确认 acknowledge=manual
- 根据异常情况确认 acknowledge=auto
事务机制
------后续补充
消费端限流
- 确保ack机制为手动确认
- listener contrainer 配置属性
- perfetch = 5 表示每次从MQ拉取5条消息消费,直到手动确认消费完毕后,才会继续拉去下一条消息
TTL - Time To Live 存活时间/过期时间
当消息到达存活时间后,还没有被消费,会被清除
RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间
# 设置队列整体过期时间
x-message-ttl 单位:ms(毫秒)
# 设置消息过期时间
expiration 单位:ms(毫秒)
注意事项:
- 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准
- 队列过期后,会将队列所有的消息全部移除
- 消息过期后,只有消息在队列顶端,才会判断其是否过期(是为了提高效率)
死信队列
DLX Dead Letter Exchange (死信交换机,因为其他没有交换机概念)
当消息成为Dead message 后,可以被重新发送到另外一个交换机,这个交换机就是DLX
消息在什么情况下成为死信
- 队列长度达到限制
- 消费者拒绝接收消费消息,并不把消息重新放入原目标队列
- 原队列存在消息过期设置,消息达到超时时间未被消费
延迟队列
什么事延迟队列?
- 延迟队列,即消息进入队列后不会立即被消费,只有到达制定时间后,才会被消费
例:
有需求如下:
- 1.下单后,30分钟未支付,取消订单,回滚库存
- 新用户注册7天后,发送短信问候
实现方式:
- 定时器 但是并不优雅
- 延迟队列
RabbitMQ并沒有直接的有直接的延迟队列,目前是通过TTL和死信队列组合实现延迟队列效果
短信功能同样如此
日志监控
图形界面和相关指令
消息追踪
通过插件实现
tract等
接口幂等性
通过乐观锁可以实现, 即增加个字段 version 只进行修改数据version+=1 每次修改判断version是否一致 如果不一致则修改提交失败