zoukankan      html  css  js  c++  java
  • JMS笔记(三)

    最近重看activemq,对消息的传送确认机制有了进一步认识

    1. mq在确认consumer收到消息后才会删除消息,因此consumer接收消息后应该进行ack"确认",javax.jms.Session接口中有四种ack模式:

    • AUTO_ACKNOWLEDGE = 1    自动确认
    • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    • SESSION_TRANSACTED = 0    事务提交并确认

    使用代码如下:

    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  //(事务,ack模式)

    若事务为true,用户指定的ACK模式将被忽略,而强制使用"SESSION_TRANSACTED"类型;

    若事务为false,ACK模式不能设定为"SESSION_TRANSACTED"

    ack指令又有几种类型,在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者,ActiveMQ中定义了如下几种ACK_TYPE:

    • DELIVERED_ACK_TYPE = 0    消息"已接收",但尚未处理结束
    • STANDARD_ACK_TYPE = 2    "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
    • POSION_ACK_TYPE = 1    消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
    • REDELIVERED_ACK_TYPE = 3    消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
    • INDIVIDUAL_ACK_TYPE = 4    表示只确认"单条消息",无论在任何ACK_MODE下    
    • UNMATCHED_ACK_TYPE = 5    在Topic中,如果一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在Broker上确认了消息)。

    我们知道consumer有两种接收消息的方式:同步(onsumer.receive())和异步(consumer.setMessageListener),同步调用时,在消息从receive方法返回之前,就已经调用了ACK,根据不同ack模式处理(发送不同ack指令);异步调用时,消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息不能被接收,会触发重发。

    2. 对四种模式详细说明

    2.1AUTO_ACKNOWLEDGE,

    • 在“同步”(receive)方法返回message之前,会检测optimizeACK选项是否开启,

       如果没有开启,此单条消息将立即确认,所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,   即"潜在的消息丢失";如果开启了optimizeACK,则会在unAck数量达到prefetchSize * 0.65时确认,当然我们可以指定prefetchSize = 1来实现逐条消息确    认。

    • 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK。

       如果onMessage方法异常,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;

       如果onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。

       此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,如果重发次数达到阀值,将   会导致发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,此消息将会被删除或者迁移到"dead letter"通道中。

       因此建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可   能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。

    2.2CLIENT_ACKNOWLEDGE,在此模式下,开发者需要需要关注几个方法:

    1) message.acknowledge(),

    2) ActiveMQMessageConsumer.acknowledege(),

    3) ActiveMQSession.acknowledge();

    其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。

    为了避免混乱,对于这种ACK模式下,建议一个session下只有一个consumer。

    无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的"确认"指令,这会触发broker端可以继续push消息到client端。

    2.3 DUPS_OK_ACKNOWLEDGE,它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

    1) 在ActiveMQ中,如果在Destination是Queue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetchSize > 0)”这种情况,在确认时机上几乎完全一致;此外在此模式下,如果prefetchSize =1 或者没有开启optimizeACK,也会导致消息逐条确认,从而失去批量确认的特性。

    2) 如果Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即无论optimizeACK是否开启,都会在消费的消息个数>=prefetchSize * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。

        这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

    2.4 SESSION_TRANSACTED,当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。

    参考文章:

    1. ActiveMQ消息传送机制以及ACK机制详解

    2. ActiveMQ的消息重发策略和DLQ处理

    3. ActiveMQ中Consumer特征详解与优化

    4. ActiveMQ中Producer特性详解

  • 相关阅读:
    JavaFx在macOS下的文字渲染Bug
    Java多接口同名方法的冲突
    旧技术的惯性
    一点思考(1)
    slisp:编译到JVM平台上的lisp方言
    Arcee:又一个 Parser Generator 轮子
    使用Java实现一门简单的动态语言
    Hello World!
    [NOI2008]奥运物流
    [IOI2005]Riv河流
  • 原文地址:https://www.cnblogs.com/yhzh/p/5369066.html
Copyright © 2011-2022 走看看