zoukankan      html  css  js  c++  java
  • ActiveMQ笔记(7):如何清理无效的延时消息?

    ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中。

    下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办法。

    为了演示方便,先封装一个小工具类:

    package cn.mwee.utils.mq;
    
    import cn.mwee.utils.list.ListUtil;
    import cn.mwee.utils.log4j2.MwLogger;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessagePostProcessor;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * Created by yangjunming on 6/20/16.
     */
    public final class MessageUtil {
    
        private Logger logger = new MwLogger(MessageUtil.class);//这里就是一个Log4j2的实例,大家可以换成原生的log4j2或类似工具
    
        private ConnectionFactory connectionFactory;
        private long receiveTimeout;//接收超时时间
        private JmsTemplate jmsTemplate;
        private List<String> destinationQueueNames;
        private final static String BACKUP_QUEUE_SUFFIX = "_B";
        private boolean autoBackup = false;//是否自动将消息备份到_b的队列,方便调试
    
    
        public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List<String> destinationQueueNames) {
            this.connectionFactory = connectionFactory;
            this.receiveTimeout = receiveTimeout;
            this.destinationQueueNames = new ArrayList<>();
            this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList()));
            jmsTemplate = new JmsTemplate(this.connectionFactory);
            jmsTemplate.setReceiveTimeout(this.receiveTimeout);
        }
    
        public MessageUtil(ConnectionFactory connectionFactory, List<String> destinationQueueNames) {
            this(connectionFactory, 10000, destinationQueueNames);
        }
    
    
        public void convertAndSend(Object message) {
            if (ListUtil.isEmpty(destinationQueueNames)) {
                logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
                return;
            }
            for (String dest : destinationQueueNames) {
                jmsTemplate.convertAndSend(dest, message);
                if (autoBackup) {
                    jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message);
                }
            }
        }
    
        public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) {
            if (ListUtil.isEmpty(destinationQueueNames)) {
                logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
                return;
            }
            for (String dest : destinationQueueNames) {
                jmsTemplate.convertAndSend(dest, message, messagePostProcessor);
                if (autoBackup) {
                    jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
                }
            }
        }
    
        public void convertAndSend(String destinationName, Object message) {
            if (StringUtils.isBlank(destinationName)) {
                logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
                return;
            }
            jmsTemplate.convertAndSend(destinationName, message);
            if (autoBackup) {
                jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message);
            }
        }
    
    
        public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) {
            if (StringUtils.isBlank(destinationName)) {
                logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
                return;
            }
            jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor);
            if (autoBackup) {
                jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
            }
        }
    
        public static String getText(javax.jms.Message message) {
            if (message instanceof TextMessage) {
                try {
                    return ((TextMessage) message).getText();
                } catch (JMSException e) {
                    return message.toString();
                }
            }
            return message.toString();
        }
    
        public String getFirstDestination() {
            if (ListUtil.isEmpty(destinationQueueNames)) {
                return null;
            }
            return destinationQueueNames.get(0);
        }
    
    
        public boolean isAutoBackup() {
            return autoBackup;
        }
    
        public void setAutoBackup(boolean autoBackup) {
            this.autoBackup = autoBackup;
        }
    }
    

    其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 这个方法,其它代码可以无视。

    先来模拟瞬间向MQ发送大量延时消息:

        /**
         * 发送延时消息
         *
         * @param messageUtil
         */
        private static void sendScheduleMessage(MessageUtil messageUtil) {
            for (int i = 0; i < 10000; i++) {
                Object obj = "test:" + i;
                messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000));
            }
        }
    

    这里向MQ发送了1w条延时消息,每条消息延时1秒*i,上面代码中的ScheduleMessagePostProcessor类可在上篇中找到。

    运行完之后,MQ中应该堆积着了很多消息了:

    下面的代码可以清空所有延时消息:

        /**
         * 删除所有延时消息
         *
         * @param connectionFactory
         * @throws JMSException
         */
        private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
            Connection conn = connectionFactory.createConnection();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
            MessageProducer producer = session.createProducer(management);
            Message request = session.createMessage();
            request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
            producer.send(request);
        }
    

    清空所有延时消息,有些用力过猛了,很多时候,我们只需要清理掉过期的延时消息(即:本来计划是8:00投递出去的消息,结果过了8点还没投递出去) 

        /**
         * 删除过期的延时消息
         *
         * @param connectionFactory
         * @throws JMSException
         */
        private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
            long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//删除:当前时间前12小时范围的延时消息
            long end = System.currentTimeMillis();
            Connection conn = connectionFactory.createConnection();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
            MessageProducer producer = session.createProducer(management);
            Message request = session.createMessage();
            request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
            request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
            request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
            producer.send(request);
        }
    

    与上一段代码基本相似,只是多指定了删除消息的起止时间段。  

    最后贴一段spring的配置文件及main函数入口

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
     5 
     6     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
     7         <property name="connectionFactory">
     8             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
     9                 <property name="brokerURL"
    10                           value="failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false&amp;backup=true"/>
    11                 <property name="maxThreadPoolSize" value="100"/>
    12             </bean>
    13         </property>
    14     </bean>
    15 
    16     <bean id="messageUtil" class="cn.mwee.utils.mq.MessageUtil">
    17         <constructor-arg index="0" ref="jmsFactory"/>
    18         <constructor-arg index="1" value="10000"/>
    19         <constructor-arg index="2">
    20             <list>
    21                 <value>dest1</value>
    22                 <value>dest2</value>
    23             </list>
    24         </constructor-arg>
    25         <property name="autoBackup" value="true"/>
    26     </bean>
    27 
    28 </beans>
    View Code

    main函数:

        public static void main(String[] args) throws InterruptedException, JMSException {
            ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml");
            ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory");
            MessageUtil messageUtil = context.getBean(MessageUtil.class);
    //        sendScheduleMessage(messageUtil);
    //        deleteAllScheduleMessage(connectionFactory);
            deleteExpiredScheduleMessage(connectionFactory);
        }
    

    参考文章:

    Enhanced JMS Scheduler in ActiveMQ

  • 相关阅读:
    Spring MVC:框架及其组件介绍
    Goods:生成订单
    Goods:我的订单查询分页
    LeetCode:Kth Largest Element in an Array(need update)
    LeetCode:Swap Nodes in Pairs
    Goods:购物车条目加减数量实现
    LeetCode:Sum Root to Leaf Numbers
    Goods:购物车模块之全选按钮与条目之复选按钮的click事件
    Goods:查询某个用户的购物车条目以及添加购物车条目
    洛谷2387 NOI2014魔法森林(LCT维护最小生成树)
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/how-to-delete-scheduled-message-in-active-mq.html
Copyright © 2011-2022 走看看