zoukankan      html  css  js  c++  java
  • SpringBoot之ActiveMQ实现延迟消息

    一、安装activeMQ

    ​ 安装步骤参照网上教程,本文不做介绍

    二、修改activeMQ配置文件

    ​ broker新增配置信息 schedulerSupport="true"

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    

    三、创建SpringBoot工程

    file

    1. 配置ActiveMQ工厂信息,信任包必须配置否则会报错
    package com.example.demoactivemq.config;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author shanks on 2019-11-12
     */
    @Configuration
    public class ActiveMqConfig {
    
        @Bean
        public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
            // 设置信任序列化包集合
            List<String> models = new ArrayList<>();
            models.add("com.example.demoactivemq.domain");
            factory.setTrustedPackages(models);
    
            return factory;
        }
    
    }
    
    
    1. 消息实体类
    package com.example.demoactivemq.domain;
    
    import lombok.Builder;
    import lombok.Data;
    
    import java.io.Serializable;
    
    /**
     * @author shanks on 2019-11-12
     */
    
    @Builder
    @Data
    public class MessageModel implements Serializable {
        private String titile;
        private String message;
    }
    
    
    1. 生产者
    package com.example.demoactivemq.producer;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.activemq.ScheduledMessage;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.jms.JmsProperties;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    import java.io.Serializable;
    
    
    /**
     * 消息生产者
     *
     * @author shanks
     */
    @Service
    @Slf4j
    public class Producer {
    
        public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");
    
        @Autowired
        private JmsMessagingTemplate template;
    
        /**
         * 发送消息
         *
         * @param destination destination是发送到的队列
         * @param message     message是待发送的消息
         */
        public <T extends Serializable> void send(Destination destination, T message) {
            template.convertAndSend(destination, message);
        }
    
        /**
         * 延时发送
         *
         * @param destination 发送的队列
         * @param data        发送的消息
         * @param time        延迟时间
         */
        public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
            Connection connection = null;
            Session session = null;
            MessageProducer producer = null;
            // 获取连接工厂
            ConnectionFactory connectionFactory = template.getConnectionFactory();
            try {
                // 获取连接
                connection = connectionFactory.createConnection();
                connection.start();
                // 获取session,true开启事务,false关闭事务
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                producer = session.createProducer(destination);
                producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
                ObjectMessage message = session.createObjectMessage(data);
                //设置延迟时间
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
                // 发送消息
                producer.send(message);
                log.info("发送消息:{}", data);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (producer != null) {
                        producer.close();
                    }
                    if (session != null) {
                        session.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    1. 消费者
    package com.example.demoactivemq.producer;
    
    
    import com.example.demoactivemq.domain.MessageModel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者
     */
    @Component
    @Slf4j
    public class Consumer {
    
    
        @JmsListener(destination = "delay.queue")
        public void receiveQueue(MessageModel message) {
            log.info("收到消息:{}", message);
        }
    }
    
    
    1. application.yml
    spring:
      activemq:
        broker-url: tcp://localhost:61616
    
    1. 测试类
    package com.example.demoactivemq;
    
    import com.example.demoactivemq.domain.MessageModel;
    import com.example.demoactivemq.producer.Producer;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @SpringBootTest(classes = DemoActivemqApplication.class)
    @RunWith(SpringRunner.class)
    class DemoActivemqApplicationTests {
    
        /**
         * 消息生产者
         */
        @Autowired
        private Producer producer;
    
        /**
         * 及时消息队列测试
         */
        @Test
        public void test() {
            MessageModel messageModel = MessageModel.builder()
                    .message("测试消息")
                    .titile("消息000")
                    .build();
            // 发送消息
            producer.send(Producer.DEFAULT_QUEUE, messageModel);
        }
    
        /**
         * 延时消息队列测试
         */
        @Test
        public void test2() {
            for (int i = 0; i < 5; i++) {
                MessageModel messageModel = MessageModel.builder()
                        .titile("延迟10秒执行")
                        .message("测试消息" + i)
                        .build();
                // 发送延迟消息
                producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);
            }
            try {
                // 休眠100秒,等等消息执行
                Thread.currentThread().sleep(100000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    执行结果

    2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
    2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
    2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
    2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
    2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4)
    2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
    2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
    2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
    2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
    2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)
    

    比你优秀的人比你还努力,你有什么资格不去奋斗!!!

  • 相关阅读:
    Effective Java 第三版——26. 不要使用原始类型
    Effective Java 第三版——25. 将源文件限制为单个顶级类
    Effective Java 第三版——24. 优先考虑静态成员类
    Effective Java 第三版——23. 优先使用类层次而不是标签类
    Effective Java 第三版——22. 接口仅用来定义类型
    Effective Java 第三版——21. 为后代设计接口
    Effective Java 第三版——20. 接口优于抽象类
    Effective Java 第三版——19. 如果使用继承则设计,并文档说明,否则不该使用
    Effective Java 第三版——18. 组合优于继承
    Effective Java 第三版——17. 最小化可变性
  • 原文地址:https://www.cnblogs.com/muyl/p/11845882.html
Copyright © 2011-2022 走看看