zoukankan      html  css  js  c++  java
  • 学习ActiveMQ(六):JMS消息的确认与重发机制

      当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送。那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认。本节将确认机制和重发机制一起在原有的代码中学习。

    消息确认机制有四种:定义于在session对象中

    AUTO_ACKNOWLEDGE= 1 :自动确认

    CLIENT_ACKNOWLEDGE= 2:客户端手动确认 

    UPS_OK_ACKNOWLEDGE= 3: 自动批量确认

    SESSION_TRANSACTED= 0:事务提交并确认

    但是在activemq补充了一个自定义的ACK模式:

    INDIVIDUAL_ACKNOWLEDGE= 4:单条消息确认

    首先在配置文件中定义重发机制ReDelivery:设置重发两次

     <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 -->
        <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
            <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
            <property name="useExponentialBackOff" value="true"/>
            <!--重发次数,默认为6次-->
            <property name="maximumRedeliveries" value="2"/>
            <!--重发时间间隔,默认为1秒 -->
            <property name="initialRedeliveryDelay" value="1000"/>
            <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
            <property name="backOffMultiplier" value="2"/>
            <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,
            第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
            <property name="maximumRedeliveryDelay" value="1000"/>
        </bean>

    在工厂中引用重发机制:

     <!--PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <!--连接mq的连接工厂-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://127.0.0.1:61616</value>
                    </property>
                    <!-- 引用重发机制 -->
                    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>

    在监听容器中新增配置消息的确认机制:

     <!--配置 消息监听容器-->
        <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="consumerMessageListener"/>
            <!--应答模式是 INDIVIDUAL_ACKNOWLEDGE-->
            <!--AUTO_ACKNOWLEDGE = 1    自动确认
            CLIENT_ACKNOWLEDGE = 2    客户端手动确认
            DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
            SESSION_TRANSACTED = 0    事务提交并确认
            INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认-->
            <property name="sessionAcknowledgeMode" value="4"/>
        </bean>

    发送者无需改动,正常发送即可,我们在消费者中进行改动,在消息接收到的时候调用确认方法(上面配置了4单挑消息确认),然后再接收消息做点手脚让其报错不能确认使其触发重发机制:

    public class ConsumerMessageListener implements SessionAwareMessageListener<Message> {//消息确认需要session,需要实现SessionAwareMessageListener
    
        @Override
        public void onMessage(Message message, Session session)  throws JMSException {
            if (message instanceof TextMessage){
                String msg = ((TextMessage) message).getText();
    
                System.out.println("------------------------------------------");
                System.out.println("消费者收到的消息:" + msg);
                System.out.println("------------------------------------------");
    
                try {
                    if ("test2".equals(msg)) {
                        throw new RuntimeException("故意抛出的异常");
                    }
                    // 确认消息。只要被确认后  就会出队,接受失败没有确认成功,会在原队列里面
                    message.acknowledge();
                } catch (Exception e) {
                    // 此不可省略 重发信息使用,如果不写此方法,将不会实现重发操作。失败的消息将会一直在队列中,因为没有进行消息确认。
                    // 下次还会监听到这条消息。效果将会是:第一次接受一个消息2。第二次接受2个,依次累加
                    session.recover();
                }
            }
    }

    发送4条消息:

      //发送字符串
            for (int i = 0; i < 4; i++) {
    
                service.sendMessage("test" + i);
            }

    结果

    由结果可见,test2由于抛出异常,未能进行消息确认,所有重发了两次,共三次。

     代码地址:https://github.com/MrLiu1227/ActiveMQ

  • 相关阅读:
    打印九九乘法三角形的各种思路
    7-14 倒数第N个字符串
    7-13 求阶乘序列前N项和
    7-12统计学生成绩
    Discrete Mathematics and Its Applications | 1 CHAPTER The Foundations: Logic and Proofs | 1.3 Propositional Equivalences
    Discrete Mathematics and Its Applications | 1 CHAPTER The Foundations: Logic and Proofs | 1.2 Applications of Propositional Logic
    Discrete Mathematics and Its Applications | 1 CHAPTER The Foundations: Logic and Proofs | 1.1 Propositional Logic
    [Mac Terminal] ___切换到其他路径和目录
    [Mac Terminal] ___MAC终端清屏快捷键
    部署flask到linux服务器
  • 原文地址:https://www.cnblogs.com/liuyuan1227/p/10776189.html
Copyright © 2011-2022 走看看