zoukankan      html  css  js  c++  java
  • [转] 消息服务中如何确保消息至少被消费一次

    本章讨论主题

    如何确保消息至少消费一次,确保消费者最大程度消费成功
    消费者消费消息有2中方式:

    1. push方式
      消息服务接收到消息之后,主动将消息推送给消费者消费
    2. pull方式
      消费者定时从消息服务中拉取消息进行消费

    下面我们将讨论2中方式中如何确保消息至少被消费一次。

    push模式

    消费的过程:

    1. 消息服务查询待消费的消息列表
    2. 轮询待消息列表
    3. 调用消费者
    4. 消费者收到消费请求,执行业务处理,将处理结果返回给消息服务
    5. 消息服务接收到消费成功的信息,将消息状态置为消费成功状态
    6. 继续消费下一条消息

    探讨一下上面需要考虑的问题:

    若消息一直消费失败如何处理?

    先说一下影响:

    1. 消息被阻塞

    消息如果一直消费失败,消息服务会不断调用消费者进行消费,会阻塞其他消息的消费,直接影响到业务的正常进行.

    消费失败的原因:

    1. 代码问题

    这种情况不管尝试多少次,消息都会消费失败,需要人工介入修复bug,这个可以依靠监控系统发现bug,同时开发进行修复。

    1. 系统运行异常

    如调用超时、网络问题等一些不可控的因素。产生这种错误,继续重试,最终会处理成功。

    此处咱们只用讨论消息服务中重试机制如何设计?

    系统异常情况下,可能过一段时间,系统恢复了,此时去重试,消费也就成功了。

    所以我们对于消费失败的消息采用延迟处理的方式,可以这么实现:

    消息中增加几个字段用于重试:

    • next_dispose_time【下次处理时间】
    • max_failure【最大允许失败次数】
    • failure【当前失败次数】

    消息入库时:

    • next_dispose_time=需消费的时间
    • max_failure = 运行最大失败次数, failure=0;

    当消费失败时,处理过程:

    1. 计算下次处理时间(next_dispose_time),可以在当前时间上面做指数递增,比如根据失败次数依次在当前时间上递增2的failure次方秒,如:

    第1次失败:当前时间 + 2秒
    第2次失败:当前时间 + 4秒
    第3次失败:当前时间 + 8秒
    第4次失败:当前时间 + 16秒
    .......
    第n次失败:当前时间 + 2的n次方秒

    1. failure++

    消息服务查询待消费的消息也需要做调整:

    select from *消息表 where next_dispose_time<=当前时间 and failure<max_failure and status = 待处理;
    

    此时能够最大程度保证消息最少消费成功一次。

    pull方式

    这种会复杂一些,为何会复杂一些,咱们先看一下常规的流程:

    1. 消费者从消息服务中拉取消息
    2. 本地进行处理
    3. 从消息服务中删除此消息
    4. 继续拉取下一条进行处理

    如果本地一直处理失败,那么后面拉取到的都是同一条消息,这条消息直接阻塞后续消息的消费,这种情况如何解?

    咱们先分析一下出现这种问题的后果及原因:

    1. 后果:消息被阻塞,业务无法正常运行
    2. 原因:代码问题或其他异常
    3. 确保代码没问题,可以解决上面问题,及时性不够高,线上要考虑系统的容错能力。

    遇到这种问题还是挺严重了,业务方都是无法接受的,一条消息消费失败,会影响到其他所有消息的消费,这个我们还是得想办法解决,可以这样:

    1. 消费者拉取消息
    2. 落地到本地
    3. 从消息服务中删除此消息
    4. 异步去消费本地落地的消息

    消息先落地,然后异步处理,本地需要有个补偿的job,去处理本地消费失败的消息,这个可以参考push方式消费的过程。

    MQ系列整个内容,我们将讨论:

    【转载自:原文地址

  • 相关阅读:
    业余爱好很有必要
    平和的心态
    合理预期
    有所为,有所不为
    iOS中使用UITextView设置不同文本部分点击事件小结
    一直在路上
    Windows平台交叉编译Arm Linux平台的QT5.7库
    UOS创建开机自启程序或脚本
    HTTP HTTP请求报文和响应报文的格式
    ObjectObject.prototype.toString.call()方法的使用
  • 原文地址:https://www.cnblogs.com/eedc/p/12851439.html
Copyright © 2011-2022 走看看