zoukankan      html  css  js  c++  java
  • Spring Boot 整合ActiveMQ实现延时发现消息

      生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。延时发送消息需要手动修改activemq目录conf下的activemq.xml配置文件,开启延时。本文中依赖文件和application.properties配置文件与《Spring Boot 整合 ActiveMQ 实现手动确认和重发消息》一致,保持不变。

    定义消息对象

    import lombok.Getter;
    import lombok.Setter;
    import lombok.ToString;
    import org.springframework.stereotype.Component;
    
    import java.io.Serializable;
    
    /**
     * @author Wiener
     */
    @Setter
    @Getter
    @ToString
    @Component
    public class User implements Serializable {
        private Long id;
        private int age;
        private String name;
        private String remark;
    
        public User() {
        }
        public User(Long id, int age, String name) {
            super();
            this.id = id;
            this.age = age;
            this.name = name;
        }
    }

    编写生产者

      生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。延时发送消息需要手动修改activemq目录conf下的activemq.xml配置文件,开启延时,在broker标签内新增属性schedulerSupport="true",然后重启activemq即可。

        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  schedulerSupport="true">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>

      下面定义生产者和消费者:

    import org.apache.activemq.ScheduledMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.jms.JmsProperties;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    import java.io.Serializable;
    
    
    @Service("producer")
    public class Producer {
        /**
         * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
         */
        @Autowired
        private JmsMessagingTemplate jmsTemplate;
    
        /**
         * 发送消息,destination是发送到的队列,message是待发送的消息
         *
         * @param destination
         * @param message
         */
        public void sendMessage(Destination destination, final String message) {
            jmsTemplate.convertAndSend(destination, message);
        }
    
        /**
         * 延时发送
         * @param destination 发送的队列
         * @param data 发送的消息
         * @param time 延迟时间
         */
        public <T extends Serializable> void delaySendMsg(Destination destination, T data, Long time){
            Connection connection = null;
            Session session = null;
            MessageProducer producer = null;
            // 获取连接工厂
            ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
            try {
                // 获取连接
                connection = connectionFactory.createConnection();
                connection.start();
                // 获取session,true开启事务,false关闭事务
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                producer = session.createProducer(destination);
                //消息持久化
    producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
                ObjectMessage message = session.createObjectMessage(data);
                //设置延迟时间
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
                // 发送消息
                producer.send(message);
    //一旦开启事务发送,那么就必须使用commit方法提交事务
                session.commit();
            } catch (Exception e){
                e.printStackTrace();
            } finally {
                try {
                    if (producer != null){
                        producer.close();
                    }
                    if (session != null){
                        session.close();
                    }
                    if (connection != null){
                        connection.close();
                    }
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
    }

      JmsTemplate类的 setDeliveryMode(int deliveryMode)可以对持久化策略进行配置,1表示非持久化,2表示持久化,二者定义于DeliveryMode类中。

        /**
         * Set the delivery mode to use when sending a message.
         * Default is the JMS Message default: "PERSISTENT".
         * <p>Since a default value may be defined administratively,
         * this is only used when "isExplicitQosEnabled" equals "true".
         * @param deliveryMode the delivery mode to use
         */
        public void setDeliveryMode(int deliveryMode) {
            this.deliveryMode = deliveryMode;
        }
    public interface DeliveryMode {
    
        /**
         * A Jakarta Messaging provider must deliver a {@code NON_PERSISTENT} message with an at-most-once guarantee. This means that it may
         * lose the message, but it must not deliver it twice.
         */
        int NON_PERSISTENT = 1;
    
        /**
         * This delivery mode instructs the Jakarta Messaging provider to log the message to stable storage as part of the client's send
         * operation. Only a hard media failure should cause a {@code PERSISTENT} message to be lost.
         */
        int PERSISTENT = 2;
    }

      消费者正常消费消息即可。

    @Component
    public class ConsumerListener2 {
        private static Logger logger = LoggerFactory.getLogger(ConsumerListener2.class);
        /**
         * 使用JmsListener配置消费者监听的队列
         * @param receivedMsg 接收到的消息
         */
        @JmsListener(destination = "myDest.queue")
        public void receiveQueue(String receivedMsg) {
            logger.info("Consumer2 收到的报文为: {}", receivedMsg);
        }
    }
    
    @Component
    public class ConsumerListener {
        private static Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
    
        @JmsListener(destination = "myDest.queue")
        public void receiveQueue(String receivedMsg) {
            logger.info("Consumer收到的报文为: {}", receivedMsg);
        }
    
    }

    编写测试用例

    import com.alibaba.fastjson.JSON;
    import com.eg.wiener.controller.Producer;
    import com.eg.wiener.dto.User;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import javax.jms.Destination;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @SpringBootTest
    class WienerApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        private Producer producer;
    
        /**
         * 延迟处理
         * @throws InterruptedException
         */
        @Test
        public void mqDelayTest() throws InterruptedException {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Destination destination = new ActiveMQQueue("myDest.queue");
            User user = new User();
            user.setName("演示发送消息");
            user.setId(33L);
            for (int i = 0; i < 6; i++) {
                user.setId(i *100L);
                user.setAge(i + 20);
                user.setRemark(df.format(new Date()) );
                producer.delaySendMsg(destination, JSON.toJSONString(user), 10000L);
            }
            try {
                // 休眠,等等消息执行
                Thread.currentThread().sleep(30000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      这里贴出延迟发送效果的部分日志:

    2020-09-20 18:21:02.047  INFO 9728 --- [enerContainer-1] c.eg.wiener.controller.ConsumerListener  : Consumer收到的报文为: {"age":23,"id":300,"name":"演示发送消息","remark":"2020-09-20 18:20:10"}
    2020-09-20 18:21:02.064  INFO 9728 --- [enerContainer-1] c.eg.wiener.controller.ConsumerListener  : Consumer收到的报文为: {"age":25,"id":500,"name":"演示发送消息","remark":"2020-09-20 18:20:10"}

      remark记录了消息生成时间,而日志打印时间是消息发送时间,晚于remark时间,足以说明消息是延迟发送的。

      关于本文内容,大家有什么看法?欢迎留言讨论,也希望大家多多点赞关注。祝各位生活愉快!工作顺利!

  • 相关阅读:
    p3201&bzoj1483 梦幻布丁
    p1341 无序字母对
    p2590&bzoj1036 树的统计
    p1462 通往奥格瑞玛的道路
    p1522 牛的旅行 Cow Tours
    ARC097D Equals
    p2371&bzoj2118 墨墨的等式
    ARC097C K-th Substring
    欧拉函数入门合集(模板)
    主席树
  • 原文地址:https://www.cnblogs.com/east7/p/13728461.html
Copyright © 2011-2022 走看看