zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(四)SpringSpringBoot 整合 Activemq

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、Spring 整合Activemq

    1、所需jar包

     
    <dependencies>
     
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.10</version>
        </dependency>
     
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.2.1.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.1.RELEASE</version>
        </dependency>
    </dependencies>

    2、Spring配置文件(applicationContext.xml)

     
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
     
        <!--  开启包的自动扫描  -->
        <context:component-scan base-package="com.demo.activemq"/>
        <!--  配置生产者  -->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供      -->
                <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
     
        <!--  这个是队列目的地,点对点的Queue  -->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--    通过构造注入Queue名    -->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
     
        <!--  这个是队列目的地,  发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
     
        <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--    传入连接工厂    -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--    传入目的地    -->
            <property name="defaultDestination" ref="destinationQueue"/>
            <!--    消息自动转换器    -->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    </beans>

    3、编写代码

    (1)队列(Queue)

    @Service
    public class SpringMQ_Producer {
     
        private JmsTemplate jmsTemplate;
     
        @Autowired
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Producer springMQ_producer = applicationContext.getBean(SpringMQ_Producer.class);
     
            springMQ_producer.jmsTemplate.send(session -> session.createTextMessage("***Spring和ActiveMQ的整合case111....."));
            System.out.println("********send task over");
        }
    }
    
    @Service
    public class SpringMQ_Consumer {
     
     
        private JmsTemplate jmsTemplate;
     
        @Autowired
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Consumer springMQ_consumer = applicationContext.getBean(SpringMQ_Consumer.class);
            String returnValue = (String) springMQ_consumer.jmsTemplate.receiveAndConvert();
            System.out.println("****消费者收到的消息:   " + returnValue);
        }
    }
    

    (2)主题(Topic)

    生产者和消费者都可以通过jmsTemplate对象实时设置目的地等其他信息

    @Service
    public class SpringMQ_Topic_Producer {
        private JmsTemplate jmsTemplate;
     
        public SpringMQ_Topic_Producer(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Topic_Producer springMQ_topic_producer = applicationContext.getBean(SpringMQ_Topic_Producer.class);
            //直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
            springMQ_topic_producer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));
            springMQ_topic_producer.jmsTemplate.send(session -> session.createTextMessage("***Spring和ActiveMQ的整合TopicCase111....."));
        }
    }
    @Service
    public class SpringMQ_Topic_Consumer {
        private JmsTemplate jmsTemplate;
     
        public SpringMQ_Topic_Consumer(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
     
        public static void main(String[] args) {
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
            SpringMQ_Topic_Consumer springMQConsumer = applicationContext.getBean(SpringMQ_Topic_Consumer.class);
            //直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
            springMQConsumer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));
            String returnValue = (String) springMQConsumer.jmsTemplate.receiveAndConvert();
            System.out.println("****消费者收到的消息:   " + returnValue);
        }
    }
    

    4、监听配置

    在Spring里面实现消费者不启动,直接通过配置监听完成

    (1)说明

    类似于前面setMessageListenner实时间提供消息

    (2)Spring 配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--  开启包的自动扫描  -->
        <context:component-scan base-package="com.demo.activemq"/>
        <!--  配置生产者  -->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供     -->
                <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--  这个是队列目的地,点对点的Queue  -->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--    通过构造注入Queue名    -->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
    
        <!--  这个是队列目的地,  发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
        <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--    传入连接工厂    -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--    传入目的地    -->
            <property name="defaultDestination" ref="destinationQueue"/>
            <!--    消息自动转换器    -->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--  配置Jms消息监听器  -->
        <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <!--  Jms连接的工厂     -->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--   设置默认的监听目的地     -->
            <property name="destination" ref="destinationTopic"/>
            <!--  指定自己实现了MessageListener的类     -->
            <property name="messageListener" ref="myMessageListener"/>
        </bean>
    </beans>
     
    

    (3)需要写一个类来实现消息监听

    /**
     * 实现MessageListener的类,需要把这个类交给xml配置里面的DefaultMessageListenerContainer管理
     */
    @Component
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者收到的消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    (4)消费者配置了自动监听,就相当于在spring里面后台运行,有消息就运行我们实现监听类里面的方法

    二、SpringBoot 整合Activemq

    1、队列(Queue)

    1.1 队列生产者:

    (1)新建Maven工程并设置包名类名

    工程名: boot_mq_producer
    包名:   com.atguigu.boot.activemq

    (2)POM依赖文件配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.demo</groupId>
        <artifactId>04activemq_springboot-queue</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    <!--    <parent>-->
    <!--        <groupId>org.springframework.boot</groupId>-->
    <!--        <artifactId>spring-boot-starter-parent</artifactId>-->
    <!--        <version>2.2.1.RELEASE</version>-->
    <!--    </parent>-->
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
        </dependencies>
    
    </project>
    

    (3)application.yml 文件

    #Springboot启动端口
    server:
      port: 8080
    
    #ActiveMQ配置
    spring:
      activemq:
        broker-url: tcp://192.168.10.130:61616 #ActiveMQ服务器IP
        user: admin #ActiveMQ连接用户名
        password: admin #ActiveMQ连接密码
      jms:
        #指定连接队列还是主题
        pub-sub-domain: false # false = Queue |  true = Topic
    
    #定义服务上的队列名
    myQueueName: springboot-activemq-queue

    (4)配置队列bean

    类似于Spring的ApplicationContext.xml文件

    @Component
    @EnableJms //开启Springboot的Jms
    public class ConfigBean {
        @Value("myQueueName")
        private String myQueueName;
    
        @Bean
        public ActiveMQQueue queue() {
            //创建一个ActiveMQQueue
            return new ActiveMQQueue(myQueueName);
        }
    }

    (5)队列生产者

    @Component
    public class Queue_Producer {
        //JmsMessagingTemplate是Springboot的Jms模板,Spring的是JmsTemplate
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        //把ConfigBean类的ActiveMQQueue注入进来
        private ActiveMQQueue activeMQQueue;
    
        //发送Queue的方法
        public void producerMsg() {
            jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************" + UUID.randomUUID().toString());
        }
    
        //构造注入对象(推荐)
        public Queue_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQQueue activeMQQueue) {
            this.jmsMessagingTemplate = jmsMessagingTemplate;
            this.activeMQQueue = activeMQQueue;
        }
    }
    

    (6)测试单元

    @SpringBootTest(classes = MainApp.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    @WebAppConfiguration
    public class TestActiveMQ {
        @Autowired
        private Queue_Producer queue_producer;
    
        @Test
        public void testSend() {
            queue_producer.producerMsg();
        }
    }
    

    结果: 测试结果应该是消息正常生产,并在控制台能够看到入列数大于0且为消费消息大于0

    1.2  队列消费者

    (1)1.新建Mavaen工程并设置包名类名

    (2)Pom依赖配置与生产者一致

    (3)application.yml

    server:
      port: 8081
    
    

    这里仅对端口进行修改,其他配置与生产者一致

    (4)消息消费者

    @Component 
    public class Queue_consummer {
      
        @JmsListener(destination = "${myqueue}")     // 注解监听   
        public void receive(TextMessage textMessage) throws  Exception{
     
            System.out.println(" ***  消费者收到消息  ***"+textMessage.getText());
     
        }
     }

    1.3  定时投递与监听消费

    新需求: 要求每隔3秒钟,往MQ推送消息       以下定时发送Case,案例修改

    分析: 在需要定时推送消息的情况下,同时也需要对消息进行跟踪消费,也就是监听消息的生产并进行消费。

    生产者代码:

    (1)修改Queue_Produce新增定时投递方法

    @Component
    public class Queue_Producer {
    
        //间隔3秒投递,SpringBoot的Scheduled用来定时执行
        @Scheduled(fixedDelay = 3000)
        public void producerMsgScheduled() {
            jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************Scheduled" + UUID.randomUUID().toString());
            System.out.println("Scheduled定时投递");
        }
    

    (2)修改主启动类的MainApp_Producer

    @SpringBootApplication
    @EnableScheduling
    public class MainApp {
        public static void main(String[] args) {
            SpringApplication.run(MainApp.class);
        }
    }

    在主启动类这里主要是要添加@EnableScheduling 注解,用来开启定时配置

    消息消费者代码增强

    (1)springboot的消息监听注解

    @JmsListener

       说明:监听过后会随着springboot一起启动,有消息就执行加了该注解的方法

    (2)具体实现代码:

    //监听接收的方法
    @JmsListener(destination = "${myQueueName}")
    public void consumerMsg(TextMessage textMessage) throws JMSException {
        String text = textMessage.getText();
        System.out.println("***消费者收到的消息:    " + text);
    }

    (3)主启动类

    @SpringBootApplication
    @EnableScheduling // 开启消息定投功能
    public class MainApp_Produce {
    
       public static void main(String[] args) {
         SpringApplication.run(MainApp_Produce.class, args);
       }
    
    }

    2、主题发布订阅(Topic)

    2.1 Topic 生产者

    (1)新建Maven工程并设置包名类名

    (2)POM文件

        订阅(Topic)的依赖于队列(Queue)的一致

    (3)Yml文件

     jms:
        pub-sub-domain: true
    
    myTopicName: springboot-activemq-topic

    (4)配置bean

    @Component
    @EnableJms  //开启Springboot的Jms
    public class ActiveMQConfigBean {
        @Value("${myTopicName}")
        private String topicName;
    
        @Bean
        public ActiveMQTopic activeMQTopic() {
            return new ActiveMQTopic(topicName);
        }
    }

    注意:类似于Spring的ApplicationContext.xml文件

    (5)Topic生产者代码:

    @Component
    @EnableScheduling
    public class Topic_Producer {
        private JmsMessagingTemplate jmsMessagingTemplate;
        private ActiveMQTopic activeMQTopic;
    
        @Scheduled(fixedDelay = 3000)
        public void producer() {
            jmsMessagingTemplate.convertAndSend(activeMQTopic, "主题消息:    " + UUID.randomUUID().toString());
        }

    2.2 Topic 消费者

    (1)新建Maven工程并设置包名类名

    (2)Pom (同上)

    (3)yml文件

     jms:
        pub-sub-domain: true
    
    myTopicName: springboot-activemq-topic

    (4)配置bean

    /**
     * 设置持久化订阅
     * 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
     */
    @Component
    @EnableJms
    public class ActiveMQConfigBean {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
        @Value("${spring.activemq.user}")
        private String user;
        @Value("${spring.activemq.password}")
        private String password;
    
        public ConnectionFactory connectionFactory(){
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(brokerUrl);
            connectionFactory.setUserName(user);
            connectionFactory.setPassword(password);
            return connectionFactory;
        }
    
    
        @Bean(name = "jmsListenerContainerFactory")
        public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
            DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
            defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
            defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
            defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
            return defaultJmsListenerContainerFactory;
        }
    

    配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅

    (5)Topic消费者代码

    @Component
    public class Topic_Consumer {
    
        //需要在监听方法指定连接工厂
        @JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")
        public void consumer(TextMessage textMessage) throws JMSException {
            System.out.println("订阅着收到消息:    " + textMessage.getText());
        }
    }

    重点:

    @JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")

    启动顺序:先启动消费者,后启动生产者

  • 相关阅读:
    Keras分类问题
    Keras预测股票
    Tensflow预测股票实例
    estimator = KerasClassifier
    keras CNN解读
    Windows下Python安装: requires numpy+mkl 和ImportError: cannot import name NUMPY_MKL
    Tensorflow RNN_LSTM实例
    同时安装python2.7和python3.5
    oracle 杀掉当前用户的进程
    IMP导入时的错误以及解决办法
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288697.html
Copyright © 2011-2022 走看看