zoukankan      html  css  js  c++  java
  • Activemq的异步 延迟 死信队列

    1、异步投递 vs 同步投递

    同步发送:
    消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。

    异步发送:
    如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞 Producer.send方法。
    想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true属性

    1)如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步

    2)当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”

    3)当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。

    总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送!!!
    2、配置异步投递的方式

    官网链接:http://activemq.apache.org/async-sends

    背景

    ActiveMQ支持以同步或异步模式将消息发送到代理。使用的模式对发送呼叫的延迟有很大的影响。由于延迟通常是生产者可以实现的吞吐量的重要因素,因此使用异步发送可以显着提高系统的性能。

    好消息是,在某些情况下,ActiveMQ默认情况下以异步模式发送消息。只有在JMS规范要求使用同步发送的情况下,我们才默认使用同步发送。我们被迫以同步模式发送的情况是在事务外部发送持久性消息时。

    如果您不使用事务并且正在发送持久消息,则每次发送都将同步并阻塞,直到代理已向生产者发送回确认消息已将消息安全保存到磁盘为止。该ack保证了消息不会丢失,但是由于客户端被阻止,因此还付出了巨大的等待时间。

    许多高性能应用程序旨在承受故障情况下的少量消息丢失。如果您的应用程序是按照这种方式设计的,则即使使用持久性消息,也可以启用异步发送来提高吞吐量。

    使用连接URI配置异步发送

    您可以使用连接配置URI来配置异步发送,如下所示

    cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
    

    在ConnectionFactory级别配置异步发送

    您可以使用属性在ActiveMQConnectionFactory对象上启用此功能。

    ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
    

    在连接级别配置异步发送

    在此级别配置dispatchAsync设置将覆盖连接工厂级别的设置。

    您可以使用属性在ActiveMQConnection对象上启用此功能。

    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    

    注意:如果是Spring或SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递

        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory(@Value("${spring.activemq.broker-url}") String url) {
            ActiveMQConnectionFactory activeMQConnectionFactory =
                    new ActiveMQConnectionFactory(
                            "admin",
                            "admin",
                            url);
            return activeMQConnectionFactory;
        }
    
        /**
         * 配置用于异步发送的非持久化JmsTemplate
         */
        @Bean
        @Autowired
        @Primary
        public JmsTemplate asynJmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
            JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            return template;
        }
    
        /**
         * 配置用于同步发送的持久化JmsTemplate
         */
        @Autowired
        @Bean
        public JmsTemplate synJmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
            JmsTemplate template = new JmsTemplate(activeMQConnectionFactory);
            return template;
        }
    

    异步投递丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)持续发送消息。
    由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。
    这时,可以给异步投递方法接收回调,以确认消息是否发送成功!

      @Test
        public void asynMessage() {
            Connection connection = null;
            Session session = null;
            ActiveMQMessageProducer producer = null;
            // 获取连接工厂
            ConnectionFactory connectionFactory = asynJmsTemplate.getConnectionFactory();
            try {
                connection = connectionFactory.createConnection();
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                Queue queue = session.createQueue(name);
                int count = 100;
                producer = (ActiveMQMessageProducer)session.createProducer(queue);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                long start = System.currentTimeMillis();
                for (int i = 0; i < count; i++) {
                    //创建需要发送的消息
                    TextMessage textMessage = session.createTextMessage("Hello");
                    //设置消息唯一ID
                    String msgid = UUID.randomUUID().toString();
                    textMessage.setJMSMessageID(msgid);
                    producer.send(textMessage, new AsyncCallback() {
                        @Override
                        public void onSuccess() {
                            // 使用msgid标识来进行消息发送成功的处理
                            System.out.println(msgid + " 消息发送成功");
                        }
    
                        @Override
                        public void onException(JMSException exception) {
                            // 使用msgid表示进行消息发送失败的处理
                            System.out.println(msgid + " 消息发送失败");
                            exception.printStackTrace();
                        }
                    });
                }
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    2、 延迟投递

    生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。
    1、修改activemq.xml

    image-20201019165338979

    注意:添加 schedulerSupport="true"配置
    2、在代码中设置延迟时长

        /**
         * 延迟投递
         */
        @Test
        public void sendMessage(){
    
            Connection connection = null;
            Session session = null;
            ActiveMQMessageProducer producer = null;
            // 获取连接工厂
            ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
    
            try {
                connection = connectionFactory.createConnection();
    
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                Queue queue = session.createQueue(name);
                int count = 10;
    
                producer = (ActiveMQMessageProducer) session.createProducer(queue);
    
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                //创建需要发送的消息
                TextMessage textMessage = session.createTextMessage("Hello");
    
                //设置延时时长(延时10秒)
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
    
                producer.send(textMessage);
    
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    

    3、死信队列

    DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息

    img

    消息消费的重试机制

    (1) 是什么

    官网文档:http://activemq.apache.org/redelivery-policy

    是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

    (2) 具体哪些情况会引发消息重发

    ① Client用了transactions且再session中调用了rollback

    ② Client用了transactions且再调用commit之前关闭或者没有commit

    ③ Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

    (3) 请说说消息重发时间间隔和重发次数

    间隔:1

    次数:6

    每秒发6次

    (4) 有毒消息Poison ACK

    一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。

    注意两点:
    1)缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ
    2)缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。

    <destinationPolicy>
          <policyMap>
           <policyEntries>
             <policyEntry queue=">">
              <deadLetterStrategy>
                <individualDeadLetterStrategy queuePrefix="DLQ."
                 useQueueForQueueMessages="true" />
              </deadLetterStrategy>
             </policyEntry>
       
             <policyEntry topic=">" >        
              <pendingMessageLimitStrategy>
               <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
             </policyEntry>
           </policyEntries>
          </policyMap>
    </destinationPolicy>
    

    2、RedeliveryPolicy重发策略设置
    修改启动类

    /**
     * 配置类
     */
    @Configuration
    public class ActiveMQConfig {
    
        //RedeliveryPolicy重发策略设置
        @Bean
        public RedeliveryPolicy redeliveryPolicy(){
            RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
            //是否在每次尝试重新发送失败后,增长这个等待时间
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重发次数,默认为6次   这里设置为10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重发时间间隔,默认为1秒
            redeliveryPolicy.setInitialRedeliveryDelay(2);
            //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);
    
            return redeliveryPolicy;
        }
    
        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
            ActiveMQConnectionFactory activeMQConnectionFactory =
                    new ActiveMQConnectionFactory(
                            "admin",
                            "admin",
                            url);
            activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            return activeMQConnectionFactory;
        }
    
        @Bean
        public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new JmsTransactionManager(connectionFactory);
        }
    
        @Bean(name="jmsQueryListenerFactory")
        public DefaultJmsListenerContainerFactory   jmsListenerContainerFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager){
            DefaultJmsListenerContainerFactory  factory=new DefaultJmsListenerContainerFactory ();
            factory.setTransactionManager(transactionManager);
            factory.setConnectionFactory(connectionFactory);
            factory.setSessionTransacted(true); // 开启事务
            factory.setSessionAcknowledgeMode(1);
            return factory;
        }
    }
    
    
  • 相关阅读:
    Netty学习(四)-TCP粘包和拆包
    Netty学习(三)-Netty重要接口讲解
    Netty学习(二)-Helloworld Netty
    Netty学习(一)-为什么选择Netty
    java学习-NIO(五)NIO学习总结以及NIO新特性介绍
    java学习-NIO(四)Selector
    哈希表 HashTable(又名散列表)
    设计模式-外观模式
    设计模式-装饰模式
    设计模式-适配器模式
  • 原文地址:https://www.cnblogs.com/dalianpai/p/13841453.html
Copyright © 2011-2022 走看看