zoukankan      html  css  js  c++  java
  • 记一次消息总线的打造

    虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。

    先说下原始需求:

    • Web前端发送命令消息,后端Consumer处理,然后前端得到结果
    • 需要支持Windows服务

    很快,下图就出来了:

    先来分析分析:

      • 前端怎么知道后端已经处理完成?
      • 前端如何在处理完后的第一时间被触发去执行某些callback呢?
      • Web前端很可能会通过ajax来定时查看某消息的处理状态

      第一反应是增加应答队列,此时:

      • 前端能够很及时的被通知到(后端处理完触发),来执行callback
      • 但是
        • ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
        • 如果前端关闭一段时间,消息会积压下来,性能越变越差

      因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:

     再来分析分析,此时

      • 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
      • 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
      • 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
        • 基于DB行记录的通知效率低
      • 但,即便增加了这个应答队列,也会出现如下问题
        • 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差

      此时该咋办?

      答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到

      因此,继续出一张图

        

      图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。

      再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:

      • 在业务逻辑角度,消息只能被无错处理一次
      • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
      • 对于报Exception的消息,人工处理要方便

      分别分析    

      • 在业务逻辑角度,消息只能被无错处理一次
        • 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
        • 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
        • 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
          • 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
          • 解决方法
            • 在业务逻辑代码中加入幂等性
            • 在业务逻辑代码中加入检查性质代码
            • 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
      • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
        • 增加相应的Exception队列,实际中是增加了2个:如:
          • 比如目前有队列messages.CommandA,则异常队列有:
            • messages.CommandA.exceptions1
            • messages.CommandA.exceptions2
            • 为啥是2个?看后续
      • 对于报Exception的消息,人工处理要方便

        • 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
        • 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
        • 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
        • 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
        • 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
          • 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的

     

    剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:

     

    注意:RABBITMQ默认是不支持客户端消息的优先级的,默认只支持Consumer的优先级,那如何实现客户端消息的优先级呢?

      1. 发送消息时,设置properties属性,记得设置Priority属性值
      2. 安装插件rabbitmq_priority_queue
      3. DeclareQueue时,加入x-max-priority特性值

     

    DONE.

  • 相关阅读:
    file_zilla 通过key连接远程服务器
    git 恢复丢失的文件
    花括号中的json数据--->转为数组array
    3种日志类型,微信付款反馈-->写入txt日志
    清空数据库中所有表--连表删除
    冒泡排序, 使用最低票价.---双重循环,一重移动次数.二重移动
    navicat 连接远程mysql
    付款前.检查状态.防止重复付款,需要ajax设置为同步,等待ajax返回结果再使用
    反射
    设计模式六大原则
  • 原文地址:https://www.cnblogs.com/aarond/p/MessageBus.html
Copyright © 2011-2022 走看看