zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(四)SpringSpringBoot 整合 Activemq

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、Spring 整合Activemq

    1、所需jar包

     
    <dependencies>
     
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.10</version>
        </dependency>
     
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.2.1.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.1.RELEASE</version>
        </dependency>
    </dependencies>

    2、Spring配置文件(applicationContext.xml)

     
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
     
        <!--  开启包的自动扫描  -->
        <context:component-scan base-package="com.demo.activemq"/>
        <!--  配置生产者  -->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供      -->
                <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
     
        <!--  这个是队列目的地,点对点的Queue  -->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--    通过构造注入Queue名    -->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
     
        <!--  这个是队列目的地,  发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
     
        <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--    传入连接工厂    -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--    传入目的地    -->
            <property name="defaultDestination" ref="destinationQueue"/>
            <!--    消息自动转换器    -->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    </beans>

    3、编写代码

    (1)队列(Queue)

    @Service
    public class SpringMQ_Producer {
     
        private JmsTemplate jmsTemplate;
     
        @Autowired
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Producer springMQ_producer = applicationContext.getBean(SpringMQ_Producer.class);
     
            springMQ_producer.jmsTemplate.send(session -> session.createTextMessage("***Spring和ActiveMQ的整合case111....."));
            System.out.println("********send task over");
        }
    }
    
    @Service
    public class SpringMQ_Consumer {
     
     
        private JmsTemplate jmsTemplate;
     
        @Autowired
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Consumer springMQ_consumer = applicationContext.getBean(SpringMQ_Consumer.class);
            String returnValue = (String) springMQ_consumer.jmsTemplate.receiveAndConvert();
            System.out.println("****消费者收到的消息:   " + returnValue);
        }
    }
    

    (2)主题(Topic)

    生产者和消费者都可以通过jmsTemplate对象实时设置目的地等其他信息

    @Service
    public class SpringMQ_Topic_Producer {
        private JmsTemplate jmsTemplate;
     
        public SpringMQ_Topic_Producer(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Topic_Producer springMQ_topic_producer = applicationContext.getBean(SpringMQ_Topic_Producer.class);
            //直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
            springMQ_topic_producer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));
            springMQ_topic_producer.jmsTemplate.send(session -> session.createTextMessage("***Spring和ActiveMQ的整合TopicCase111....."));
        }
    }
    @Service
    public class SpringMQ_Topic_Consumer {
        private JmsTemplate jmsTemplate;
     
        public SpringMQ_Topic_Consumer(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Topic_Consumer springMQConsumer = applicationContext.getBean(SpringMQ_Topic_Consumer.class);
            //直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
            springMQConsumer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));
            String returnValue = (String) springMQConsumer.jmsTemplate.receiveAndConvert();
            System.out.println("****消费者收到的消息:   " + returnValue);
        }
    }
    

    4、监听配置

    在Spring里面实现消费者不启动,直接通过配置监听完成

    (1)说明

    类似于前面setMessageListenner实时间提供消息

    (2)Spring 配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--  开启包的自动扫描  -->
        <context:component-scan base-package="com.demo.activemq"/>
        <!--  配置生产者  -->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供     -->
                <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--  这个是队列目的地,点对点的Queue  -->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--    通过构造注入Queue名    -->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
    
        <!--  这个是队列目的地,  发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
        <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--    传入连接工厂    -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--    传入目的地    -->
            <property name="defaultDestination" ref="destinationQueue"/>
            <!--    消息自动转换器    -->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--  配置Jms消息监听器  -->
        <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <!--  Jms连接的工厂     -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--   设置默认的监听目的地     -->
            <property name="destination" ref="destinationTopic"/>
            <!--  指定自己实现了MessageListener的类     -->
            <property name="messageListener" ref="myMessageListener"/>
        </bean>
    </beans>
     
    

    (3)需要写一个类来实现消息监听

    /**
     * 实现MessageListener的类,需要把这个类交给xml配置里面的DefaultMessageListenerContainer管理
     */
    @Component
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者收到的消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    (4)消费者配置了自动监听,就相当于在spring里面后台运行,有消息就运行我们实现监听类里面的方法

    二、SpringBoot 整合Activemq

    1、队列(Queue)

    1.1 队列生产者:

    (1)新建Maven工程并设置包名类名

    工程名: boot_mq_producer
    包名:   com.atguigu.boot.activemq

    (2)POM依赖文件配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.demo</groupId>
        <artifactId>04activemq_springboot-queue</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    <!--    <parent>-->
    <!--        <groupId>org.springframework.boot</groupId>-->
    <!--        <artifactId>spring-boot-starter-parent</artifactId>-->
    <!--        <version>2.2.1.RELEASE</version>-->
    <!--    </parent>-->
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
        </dependencies>
    
    </project>
    

    (3)application.yml 文件

    #Springboot启动端口
    server:
      port: 8080
    
    #ActiveMQ配置
    spring:
      activemq:
        broker-url: tcp://192.168.10.130:61616 #ActiveMQ服务器IP
        user: admin #ActiveMQ连接用户名
        password: admin #ActiveMQ连接密码
      jms:
        #指定连接队列还是主题
        pub-sub-domain: false # false = Queue |  true = Topic
    
    #定义服务上的队列名
    myQueueName: springboot-activemq-queue

    (4)配置队列bean

    类似于Spring的ApplicationContext.xml文件

    @Component
    @EnableJms //开启Springboot的Jms
    public class ConfigBean {
        @Value("myQueueName")
        private String myQueueName;
    
        @Bean
        public ActiveMQQueue queue() {
            //创建一个ActiveMQQueue
            return new ActiveMQQueue(myQueueName);
        }
    }

    (5)队列生产者

    @Component
    public class Queue_Producer {
        //JmsMessagingTemplate是Springboot的Jms模板,Spring的是JmsTemplate
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        //把ConfigBean类的ActiveMQQueue注入进来
        private ActiveMQQueue activeMQQueue;
    
        //发送Queue的方法
        public void producerMsg() {
            jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************" + UUID.randomUUID().toString());
        }
    
        //构造注入对象(推荐)
        public Queue_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQQueue activeMQQueue) {
            this.jmsMessagingTemplate = jmsMessagingTemplate;
            this.activeMQQueue = activeMQQueue;
        }
    }
    

    (6)测试单元

    @SpringBootTest(classes = MainApp.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    @WebAppConfiguration
    public class TestActiveMQ {
        @Autowired
        private Queue_Producer queue_producer;
    
        @Test
        public void testSend() {
            queue_producer.producerMsg();
        }
    }
    

    结果: 测试结果应该是消息正常生产,并在控制台能够看到入列数大于0且为消费消息大于0

    1.2  队列消费者

    (1)1.新建Mavaen工程并设置包名类名

    (2)Pom依赖配置与生产者一致

    (3)application.yml

    server:
      port: 8081
    
    

    这里仅对端口进行修改,其他配置与生产者一致

    (4)消息消费者

    @Component 
    public class Queue_consummer {
      
        @JmsListener(destination = "${myqueue}")     // 注解监听   
        public void receive(TextMessage textMessage) throws  Exception{
     
            System.out.println(" ***  消费者收到消息  ***"+textMessage.getText());
     
        }
     }

    1.3  定时投递与监听消费

    新需求: 要求每隔3秒钟,往MQ推送消息       以下定时发送Case,案例修改

    分析: 在需要定时推送消息的情况下,同时也需要对消息进行跟踪消费,也就是监听消息的生产并进行消费。

    生产者代码:

    (1)修改Queue_Produce新增定时投递方法

    @Component
    public class Queue_Producer {
    
        //间隔3秒投递,SpringBoot的Scheduled用来定时执行
        @Scheduled(fixedDelay = 3000)
        public void producerMsgScheduled() {
            jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************Scheduled" + UUID.randomUUID().toString());
            System.out.println("Scheduled定时投递");
        }
    

    (2)修改主启动类的MainApp_Producer

    @SpringBootApplication
    @EnableScheduling
    public class MainApp {
        public static void main(String[] args) {
            SpringApplication.run(MainApp.class);
        }
    }

    在主启动类这里主要是要添加@EnableScheduling 注解,用来开启定时配置

    消息消费者代码增强

    (1)springboot的消息监听注解

    @JmsListener

       说明:监听过后会随着springboot一起启动,有消息就执行加了该注解的方法

    (2)具体实现代码:

    //监听接收的方法
    @JmsListener(destination = "${myQueueName}")
    public void consumerMsg(TextMessage textMessage) throws JMSException {
        String text = textMessage.getText();
        System.out.println("***消费者收到的消息:    " + text);
    }

    (3)主启动类

    @SpringBootApplication
    @EnableScheduling // 开启消息定投功能
    public class MainApp_Produce {
    
       public static void main(String[] args) {
         SpringApplication.run(MainApp_Produce.class, args);
       }
    
    }

    2、主题发布订阅(Topic)

    2.1 Topic 生产者

    (1)新建Maven工程并设置包名类名

    (2)POM文件

        订阅(Topic)的依赖于队列(Queue)的一致

    (3)Yml文件

     jms:
        pub-sub-domain: true
    
    myTopicName: springboot-activemq-topic

    (4)配置bean

    @Component
    @EnableJms  //开启Springboot的Jms
    public class ActiveMQConfigBean {
        @Value("${myTopicName}")
        private String topicName;
    
        @Bean
        public ActiveMQTopic activeMQTopic() {
            return new ActiveMQTopic(topicName);
        }
    }

    注意:类似于Spring的ApplicationContext.xml文件

    (5)Topic生产者代码:

    @Component
    @EnableScheduling
    public class Topic_Producer {
        private JmsMessagingTemplate jmsMessagingTemplate;
        private ActiveMQTopic activeMQTopic;
    
        @Scheduled(fixedDelay = 3000)
        public void producer() {
            jmsMessagingTemplate.convertAndSend(activeMQTopic, "主题消息:    " + UUID.randomUUID().toString());
        }

    2.2 Topic 消费者

    (1)新建Maven工程并设置包名类名

    (2)Pom (同上)

    (3)yml文件

     jms:
        pub-sub-domain: true
    
    myTopicName: springboot-activemq-topic

    (4)配置bean

    /**
     * 设置持久化订阅
     * 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
     */
    @Component
    @EnableJms
    public class ActiveMQConfigBean {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
        @Value("${spring.activemq.user}")
        private String user;
        @Value("${spring.activemq.password}")
        private String password;
    
        public ConnectionFactory connectionFactory(){
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(brokerUrl);
            connectionFactory.setUserName(user);
            connectionFactory.setPassword(password);
            return connectionFactory;
        }
    
    
        @Bean(name = "jmsListenerContainerFactory")
        public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
            DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
            defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
            defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
            defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
            return defaultJmsListenerContainerFactory;
        }
    

    配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅

    (5)Topic消费者代码

    @Component
    public class Topic_Consumer {
    
        //需要在监听方法指定连接工厂
        @JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")
        public void consumer(TextMessage textMessage) throws JMSException {
            System.out.println("订阅着收到消息:    " + textMessage.getText());
        }
    }

    重点:

    @JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")

    启动顺序:先启动消费者,后启动生产者

  • 相关阅读:
    leetcode 350. Intersection of Two Arrays II
    leetcode 278. First Bad Version
    leetcode 34. Find First and Last Position of Element in Sorted Array
    leetcode 54. Spiral Matrix
    leetcode 59. Spiral Matrix II
    leetcode 44. Wildcard Matching
    leetcode 10. Regular Expression Matching(正则表达式匹配)
    leetcode 174. Dungeon Game (地下城游戏)
    leetcode 36. Valid Sudoku
    Angular Elements
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288697.html
Copyright © 2011-2022 走看看