zoukankan      html  css  js  c++  java
  • Spring和SpringBoot整合ActiveMQ

    一:ActiveMQ的Broker

    ActiveMQ除了可以作为独立进程单独部署在服务器上之外,也可以很小巧的内嵌在程序中启动,下面我们来简单的介绍内置Broker启动的一种方式。

     1.1引入maven的依赖

        <!--ActiveMQ依赖包-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.9</version>
            </dependency>

    1.2Java代码

    package com.yjc.activemq;
    
    import org.apache.activemq.broker.BrokerService;
    
    public class Broker {
        public static void main(String[] args) throws Exception {
            BrokerService brokerService=new BrokerService();
            brokerService.setUseJmx(true);
            brokerService.addConnector("tcp://localhost:61616");
            brokerService.start();
        }
    }

    启动上面的main方法之后,就可以使用生产者和消费者对我们部署的这个小型的ActiveMQ进行访问了,三者的地址要一样,十分的小巧方便

    二:Spring整合ActiveMQ

    2.1引入maven的依赖

     <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
            <!--ActiveMQ依赖包-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.9</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
            <!--ActiveMQ和SPring整合包-->
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>4.14</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
            <!--用于引入ActiveMQ的broker-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.10.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
            </dependency>

    2.2配置ApplicationContext.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--开启包扫描器-->
        <context:component-scan base-package="com.yjc.spring"/>
    
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.118.3:61616" />
        </bean>
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, -->
        <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
            <property name="connectionFactory" ref="targetConnectionFactory" />
            <property name="maxConnections" value="10" />
        </bean>
        <!--默认的目的地地址-->
        <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--设置队列的名称-->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
        <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 -->
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="activeMQQueue"/>
        </bean>
    </beans>

    2.3编写生产者代码

    由于进行简单的整合测试,没有使用MVC的分层架构,仅仅使用了一个service,要想访问Spring容器中的bean对象时,需要当前对象也需要是一个bean对象,所以我用@Service将生产者和消费者都声明成bean,方便我调用其他的bean。

    package com.yjc.spring;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.TextMessage;
    
    @Service
    public class Producer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml");
            Producer producer = (Producer)applicationContext.getBean("producer");
            //才用1.8的新特性lombda表达式来实现的
            producer.jmsTemplate.send((session)->{
             TextMessage textMessage= session.createTextMessage("俺是消息");
             return  textMessage;
            });
            System.out.println("消息已经放入到队列里了");
    
        }
    }

    2.4消费者

    package com.yjc.spring;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class Consumer {
        @Autowired
        private JmsTemplate jmsTemplate;
        public static void main(String[] args) {
            ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml");
            Consumer consumer = (Consumer)applicationContext.getBean("consumer");
            String retValue = (String) consumer.jmsTemplate.receiveAndConvert();
            System.out.println("----------------消费者收到的消息"+retValue);
        }
    }

    2.5在Spring中实现消费者不启动,依然可以消费消息,通过配置监听完成

    在Topic模式中,如果没有消费者进行订阅,那么生产者生产出来的消息就是非消息,我们可以通过配置监听来实现不期待消费者,实现消费

    2.5.1在配置文件中将默认的目标地址更改为Topic

     <!--开启包扫描器-->
        <context:component-scan base-package="com.yjc.spring"/>
    
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.118.3:61616" />
        </bean>
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, -->
        <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
            <property name="connectionFactory" ref="targetConnectionFactory" />
            <property name="maxConnections" value="10" />
        </bean>
      <!--默认的目的地地址-->
        <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <!--设置队列的名称-->
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
        <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 -->
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="activeMQTopic"/>
        </bean>
     <!-- 配置监听程序-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="activeMQTopic"/>
        <property name="messageListener" ref="myMessageListener"/>
    </bean>

    2.5.2创建监听类

    package com.yjc.spring;
    
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    @Component
    public class MyMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            if (null!=message&&message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("监听器监听到的消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    生产者和消费者的代码不做改动,只启动生产者即可,当生产者生产出消息之后,会被监听器立刻监听到

    三:SpringBoot整合ActiveMQ(队列)

    3.1导入依赖

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>

    3.2 application.yml配置文件

    server:
      port: 8888
    
    spring:
      activemq:
        broker-url: tcp://192.168.118.3:61616  #服务器地址
        user: admin                            #用户名
        password: admin                       #密码
      jms:
        pub-sub-domain: false                 #目的地类型,false为Queue,true为Topic,默认为false
    
    #自定义队列名称
    myqueue: boot-activemq-queue

    3.3Config配置类

    package com.yjc.activemq;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Queue;
    
    @Component
    @EnableJms  
    public class ConfigBean {
        @Value("${myqueue}")
        private String queueName;
    
        @Bean
        private Queue queue(){
            return  new ActiveMQQueue(queueName);
        }
    }

    3.4生产者

    package com.yjc.activemq;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.jms.Queue;
    import java.util.UUID;
    
    @Component
    public class Queue_Produce {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Resource
        private Queue queue;
      
      //调用方法启动一次
    public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------"+UUID.randomUUID().toString().substring(0,8)); } //定时发送消息,时间间隔为三秒,去主配置类开启支持,启动主配置类时开始定时发送 @Scheduled(fixedDelay = 3000) public void scheduledMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------scheduledMsg"+UUID.randomUUID().toString().substring(0,8)); System.out.println("时间到了发一条"); } }

    3.5 主程序类

    package com.yjc;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    @EnableScheduling //开始对定时投递的支持
    public class ActivemqApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ActivemqApplication.class, args);
        }
    
    }

    3.6 测试类

    import com.yjc.ActivemqApplication;
    import com.yjc.activemq.Queue_Produce;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import org.springframework.test.context.web.WebAppConfiguration;
    
    @SpringBootTest(classes = ActivemqApplication.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    @WebAppConfiguration
    public class QuqueTest {
        @Autowired
        private Queue_Produce queue_produce;
    
        @Test
        public void testMsg(){
            queue_produce.produceMsg();
        }
    }

    3.7消费者

    package com.yjc.consumer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    @Component
    public class Queue_Consumer {
        @JmsListener(destination = "${myqueue}")
        public  void  receive(TextMessage textMessage) throws JMSException {
            System.out.println("消费者收到的消息"+textMessage.getText());
        }
    
    }

    使用@JmsListener注解进行监听消息

  • 相关阅读:
    COM编程入门
    DirectShow Filter 开发典型例子分析 ——字幕叠加 (FilterTitleOverlay)1
    互联网的三大巨头 百度 阿里巴巴 腾讯(BAT)
    入侵Tomcat服务器一次实战
    TinyXML:一个优秀的C++ XML解析器
    Apache POI (JAVA处理Office文档的类库)
    MediaInfo源代码分析 4:Inform()函数
    MediaInfo源代码分析 3:Open()函数
    洛谷 P3905 道路重建
    CF16A Flag
  • 原文地址:https://www.cnblogs.com/yjc1605961523/p/11990177.html
Copyright © 2011-2022 走看看