zoukankan      html  css  js  c++  java
  • 【ActiveMQ】消息重传机制

    问题:

    项目在修改consumer的确认签收机制从AUTO-ACK改为CLIENT-ACK模式,但在服务优雅停机并重启之后,发现若干数据被Pending后未被重发。

    过段时间再一次停启服务后,Pending的数据才能被消费成功。

    此问题在数据量大时基本可以复现。

    类似问题:

    https://stackoverflow.com/questions/54074745/activemq-pending-messages

    https://stackoverflow.com/questions/45539799/queue-consumers-have-pending-messages-but-they-are-not-getting-processed

    https://stackoverflow.com/questions/18181619/consumer-is-not-receiving-messages-from-activemq

    思路:

    1. ActiveMQ消息重传机制

    2. ActiveMQ在非事务性生产者下

    参考:

    https://blog.csdn.net/czp11210/article/details/47022639/

    https://blog.csdn.net/qq_39706128/article/details/80570577

    http://codingdict.com/questions/36712

    CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。在此模式下,开发者需要需要关注几个方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。

        我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也很可以使用此ACK_MODE。

        如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。

        开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。

        除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

        无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端可以继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)

        在broker端,针对每个Consumer,都会保存一个因为"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,因为Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端通过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端但是还没有“STANDARD_ACK_TYPE”的消息总量;由此可见,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。我们要求client端在消费1.5*prefetchSize个消息之前,必须acknowledge()一次;通常我们总是每消费一个消息调用一次,这是一种良好的设计。

        此外需要额外的补充一下:所有ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,如果在此之前,开发者调用了acknowledge()方法,会导致消息直接被确认(STANDARD_ACK_TYPE)。broker端通常会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,如果consumer不能及时的对消息进行acknowledge而导致broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其他Consumer。

  • 相关阅读:
    struts2 类型转化(typeConverter)
    appfuse-maven-plugin(AMP)
    多项式求和,素数判定 HDU2011.2012
    A+B problems
    黑马程序员----java基础笔记中(毕向东)
    2015Web前端攻城之路
    黑马程序员----java基础笔记上(毕向东)
    黑马程序员----2015黑马之路-启程
    乱弹琴20140421
    读Thinking in java 4
  • 原文地址:https://www.cnblogs.com/cathygx/p/14581769.html
Copyright © 2011-2022 走看看