虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。
先说下原始需求:
- Web前端发送命令消息,后端Consumer处理,然后前端得到结果
- 需要支持Windows服务
很快,下图就出来了:
先来分析分析:
-
- 前端怎么知道后端已经处理完成?
- 前端如何在处理完后的第一时间被触发去执行某些callback呢?
- Web前端很可能会通过ajax来定时查看某消息的处理状态
第一反应是增加应答队列,此时:
-
- 前端能够很及时的被通知到(后端处理完触发),来执行callback
- 但是
- ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
- 如果前端关闭一段时间,消息会积压下来,性能越变越差
因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:
再来分析分析,此时
-
- 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
- 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
- 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
- 基于DB行记录的通知效率低
- 但,即便增加了这个应答队列,也会出现如下问题
- 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差
此时该咋办?
答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到
因此,继续出一张图
图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。
再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:
-
- 在业务逻辑角度,消息只能被无错处理一次
- 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
- 对于报Exception的消息,人工处理要方便
分别分析
-
- 在业务逻辑角度,消息只能被无错处理一次
- 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
- 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
- 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
- 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
- 解决方法
- 在业务逻辑代码中加入幂等性
- 或
- 在业务逻辑代码中加入检查性质代码
- 或
- 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
- 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
- 增加相应的Exception队列,实际中是增加了2个:如:
- 比如目前有队列messages.CommandA,则异常队列有:
- messages.CommandA.exceptions1
- messages.CommandA.exceptions2
- 为啥是2个?看后续
- 比如目前有队列messages.CommandA,则异常队列有:
- 增加相应的Exception队列,实际中是增加了2个:如:
-
对于报Exception的消息,人工处理要方便
- 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
- 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
- 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
- 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
- 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
- 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的
- 在业务逻辑角度,消息只能被无错处理一次
剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:
注意:RABBITMQ默认是不支持客户端消息的优先级的,默认只支持Consumer的优先级,那如何实现客户端消息的优先级呢?
- 发送消息时,设置properties属性,记得设置Priority属性值
- 安装插件rabbitmq_priority_queue
- DeclareQueue时,加入x-max-priority特性值
DONE.