zoukankan      html  css  js  c++  java
  • springboot整合activemq加入会签,自动重发机制,持久化

    消费者客户端成功接收一条消息的标志是:这条消息被签收。
    消费者客户端成功接收一条消息一般包括三个阶段:

             1、消费者接收消息,也即从MessageConsumer的receive方法返回

             2、消费者处理消息

             3、消息被签收

            其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

            在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

            在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置
     
    activemq的消息确认机制就是文档中说的ack机制有:
        AUTO_ACKNOWLEDGE = 1    自动确认
        CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
        DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
        SESSION_TRANSACTED = 0    事务提交并确认
        INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认 activemq 独有
      ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
      对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)
      与Broker之间建立一种简单的“担保”机制.
      手动确认和单条消息确认需要手动的在客户端调用message.acknowledge()
      消息重发机制RedeliveryPolicy 有几个属性如下:
    RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
                //是否在每次尝试重新发送失败后,增长这个等待时间
                redeliveryPolicy.setUseExponentialBackOff(true);
                //重发次数,默认为6次   这里设置为10次
                redeliveryPolicy.setMaximumRedeliveries(10);
                //重发时间间隔,默认为1秒
                redeliveryPolicy.setInitialRedeliveryDelay(1);
                //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
                redeliveryPolicy.setBackOffMultiplier(2);
                //是否避免消息碰撞
                redeliveryPolicy.setUseCollisionAvoidance(false);
                //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
                redeliveryPolicy.setMaximumRedeliveryDelay(-1);

    那么在整合activemq时候就只需要修改配置文件和客户端就可以了,activemq就是这种机制,例如支付宝支付回调的时候,只有我们返回一个success,支付那边才不会给我重发消息

    配置文件:

    import javax.jms.Queue;
     
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.core.JmsTemplate;
     
    @EnableJms  
    @Configuration  
    public class ActiveMQ4Config {  
     
        @Bean
        public Queue queue(){
            return new ActiveMQQueue("queue1");
        }
     
        @Bean
        public RedeliveryPolicy redeliveryPolicy(){
                RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
                //是否在每次尝试重新发送失败后,增长这个等待时间
                redeliveryPolicy.setUseExponentialBackOff(true);
                //重发次数,默认为6次   这里设置为10次
                redeliveryPolicy.setMaximumRedeliveries(10);
                //重发时间间隔,默认为1秒
                redeliveryPolicy.setInitialRedeliveryDelay(1);
                //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
                redeliveryPolicy.setBackOffMultiplier(2);
                //是否避免消息碰撞
                redeliveryPolicy.setUseCollisionAvoidance(false);
                //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
                redeliveryPolicy.setMaximumRedeliveryDelay(-1);
                return redeliveryPolicy;
        }
        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  
            ActiveMQConnectionFactory activeMQConnectionFactory =  
                    new ActiveMQConnectionFactory(
                           "admin",
                            "admin",
                            url);
            activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            return activeMQConnectionFactory;
        }
        
        @Bean
        public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
            JmsTemplate jmsTemplate=new JmsTemplate();
            jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
            jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
            jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列
            jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
            return jmsTemplate;
        }
        
        //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
        @Bean(name = "jmsQueueListener")
        public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory factory =
                    new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(activeMQConnectionFactory);
            //设置连接数
            factory.setConcurrency("1-10");
            //重连间隔时间
            factory.setRecoveryInterval(1000L);
            factory.setSessionAcknowledgeMode(4);
            return factory;
        }
     
    }

    消费者:

    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
     
    @Component
    public class Consumer {
     
        private final static Logger logger = LoggerFactory
                .getLogger(Consumer.class);
        
        @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
        public void receiveQueue(final TextMessage text, Session session)
                throws JMSException {
            try {
                logger.debug("Consumer收到的报文为:" + text.getText());
                text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
            } catch (Exception e) {    
                session.recover();// 此不可省略 重发信息使用
            }
        }
    }

    由此可以知道activemq的queue消息是可以保证消息不丢失,不会被重复消费的,因为会给每个消息设置一个唯一的id,当消息发送失败之后可以根据这个机制来进行消费,当然也是一种处理分布式事物的方法

    消息中间件的模式是可以保证消息不会丢失的,持久化和自动重发,消息回签,都可以很好的避免那种机制。消费端代码发生异常了,可以自动重发,自动消息重发。由于之前在测试的时候足够看官方文档,所以理解说客户端发生异常了,是不可以进行重发的,但是今天了解之后,发觉还是自动重发的机制,利用回签机制进行的。

  • 相关阅读:
    Unity3D游戏-愤怒的小鸟游戏源码和教程(一)
    Unity插件-ShareSDK使用指南
    Unity 3D开发-C#脚本语言的一些基础用法
    Shader的函数公式以及使用的场景
    Shader的基本用法和语法结构
    iTween的用法总结
    Unity 3D游戏-消消乐(三消类)教程和源码
    Unity 3D游戏-NPC对话系统With XML
    XML教程、语法手册、数据读取方式大全
    ReSharper2017.3的列对齐、排版格式、列对齐错误的修复
  • 原文地址:https://www.cnblogs.com/xiufengchen/p/10563402.html
Copyright © 2011-2022 走看看