怎么样可以避免重复消费
RocketMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重
接口幂等性保障 ,消费端处理业务消息要保持幂等性
Redis
1)setNX(),做消息 id 去重,java 版本目前不支持设置过期时间
//Redis中操作,判断是否已经操作过 TODO boolean flag = jedis.setNX(key); if(flag){ //消费 }else{ //忽略,重复消费 }
拓展(如果再用 expire 则不是原子操作,可以用下面方式实现分布式锁)
加锁 String result = jedis.set(key, value, "NX", "PX", expireTime) 解锁(Lua脚本,先检查key,匹配再释放锁,lua可以保证原子性) String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 备注:lockKey可以是商品id,requestId用于标示是同个客户端
2)Incr 原子操作:key自增,返回值大于0则说明消费过
int num = jedis.incr(key); if(num == 1){ /消费 }else{ //忽略,重复消费 }
上述两个方式都可以,但是不能用于分布式锁,考虑原子问题,但是排重可以不考虑原子问题,数据量多需要设置过期时间。
数据库去重表
某个字段使用 Message 的 key 做唯一索引
如何保证消息的可靠性,处理消息丢失的问题
producer 端
- 不采用 oneway 发送,使用同步或者异步方式发送,做好重试,但是重试的 Message key 必须唯一
- 投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体
broker 端
- 双主双从架构,NameServer 需要多节点
- 同步双写、异步刷盘 (同步刷盘则可靠性更高,但是性能差点,根据业务选择)
consumer 端
- 消息消费务必保留日志,即消息的元数据和消息体
- 消费端务必做好幂等性处理
投递到 broker 端后
- 机器断电重启:异步刷盘,消息丢失;同步刷盘消息不丢失
- 硬件故障:可能存在丢失,看队列架构
如果消息大量堆积在 broker 里面,应该怎么处理
线上故障了,怎么处理
- 消息堆积了10小时,有几千万条消息待处理,现在怎么办?
- 修复 consumer,然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办?
正确的姿势
- 临时 Topic 队列扩容,并提高消费者能力,但是如果增加 Consumer 数量,但是堆积的 topic 里面的 message queue 数量固定,过多的 consumer 不能分配到 message queue
- 编写临时处理分发程序,从旧 Topic 快速读取到临时新 Topic 中,新 Topic 的 queue 数量扩容多倍,然后再启动更多 consumer 进行在临时新的 Topic 里消费
RocketMQ 高性能的原因分析,高可用架构
MQ架构配置
- 顺序写,随机读,零拷贝
- 同步刷盘 SYNC_FLUSH 和异步刷盘 ASYNC_FLUSH,通过 flushDiskType 配置
- 同步复制和异步复制,通过 brokerRole 配置 ASYNC_MASTER、SYNC_MASTER、SLAVE
- 推荐同步复制(双写),异步刷盘
发送端高可用
- 双主双从架构:创建 Topic 对应的时候,MessageQueue 创建在多个 Broker 上
- 即相同的 Broker 名称,不同的 brokerid(即主从模式)当一个 Master 不可用时,组内其他的 Master 仍然可用。
- 但是机器资源不足的时候,需要手工把 slave 转成 master,目前不支持自动转换,可用 shell 处理
消费高可用
- 主从架构:Broker 角色,Master 提供读写,Slave 只支持读
- Consumer 不用配置,当 Master 用或者繁忙的时候,Consumer 会自动切换到 Slave 节点进行能读取
提高消息的消费能力
- 并行消费
- 增加多个节点
- 增加单个 Consumer 的并行度,修改 consumerThreadMin 和 consumerThreadMax
- 批量消费,设置 Consumer 的 consumerMessageBatchMaxSize,默认是1,如果为N,则消息多的时候,每次收到的消息为N条
- 选择 Linux Ext4 文件系统,Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms,而 Ext3文件系统耗时需要 1s,删除文件时磁盘IO 压力极大,会导致 IO 操作超时