zoukankan      html  css  js  c++  java
  • 学习ActiveMQ(四):spring与ActiveMQ整合

      在上一篇中已经怎么使用activemq的api来实现消息的发送接收了,但是在实际的开发过程中,我们很少使用activemq直接上去使用,因为我们每次都要创建连接工厂,创建连接,创建session。。。有些繁琐,那么利用spring的话简单多了,强大的spring

    提供了对了jms的支持,我们可以使用JmsTemplate来实现,JmsTemplate隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑,接下来就一起来看看具体怎么做吧。

    首先我们打开idea新建一个maven的webapp项目,对项目项目结构进行一下调整,如下图:

    接下来对pom.xml进行修改,需要spring的jar包和jms的jar包,我贴出我的完整代码:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.easylab.jms</groupId>
        <artifactId>jms-spring</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spring.version>4.2.5.RELEASE</spring.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${spring.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.4</version>
                <exclusions>
                    <exclusion>
                        <artifactId>spring-context</artifactId>
                        <groupId>org.springframework</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    </project>

    在resource文件夹下面建立一个common.xml文件,并进行相关配置,配置作用看代码注释。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--支持注解-->
        <context:component-scan base-package="com.easylab.jms"/>
    
        <!--使用PooledConnectionFactory对session和消息producer的缓存机制带来的性能提升-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <!--连接mq的连接工厂-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://127.0.0.1:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="connectionFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <!--配置JmsTemplate,用于发送消息-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--给一个默认的是destination-->
            <property name="defaultDestination" ref="queueDestination"/>
        </bean>
    
        <!--队列模式的destination-->
        <bean id="queueDestination"  class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"/>
        </bean>
    
        <!--主题模式的destination配置-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic"/>
        </bean>
    
        <!--消费者所需-->
        <!--配置 消息监听容器 使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量-->
        <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="consumerMessageListener"/>
        </bean>
        <!--配置 消息监听容器-->
        <bean id="jmsContainerTopic" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
                <property name="destination" ref="topicDestination"/>
            <property name="messageListener" ref="consumerMessageListenerTopic"/>
        </bean>
    
        <!--自定义消息监听器-->
        <bean id="consumerMessageListener" class="com.easylab.jms.consumer.ConsumerMessageListener"/>
        <bean id="consumerMessageListenerTopic" class="com.easylab.jms.consumer.ConsumerMessageListenerTopic"/>
    </beans>

    接下来在com.easylab.jms.producer创建一个IProducerService 发送者接口

    package com.easylab.jms.producer;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-spring  </p>
     * @ClassName :  IProducerService
     * @date : 2018/6/24 0024
     * @time : 8:41
     * @createTime 2018-06-24 8:41
     * @version : 2.0
     * @description :消息发送
     *******************************/
    public interface IProducerService {
    
    
        public void sendMessage(String message);
    }

    对这个接口进行实现,在com.easylab.jms.producer创建一个ProducerServiceImpl实现类(使用队列模式发送)

    package com.easylab.jms.producer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-spring  </p>
     * @ClassName :  ProducerServiceImpl
     * @date : 2018/6/24 0024
     * @time : 8:42
     * @createTime 2018-06-24 8:42
     * @version : 2.0
     * @description :
     *******************************/
    @Service
    public class ProducerServiceImpl implements IProducerService {
    
        @Autowired
        JmsTemplate jmsTemplate;
    
      //这里不定义的话,就使用在xml文件里配置的默认的destination
    /*    @Resource(name = "queueDestination")
        Destination destination;*/
    
        public void sendMessage(final String message) {
    
            // 使用JmsTemplate发送消息
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    // 创建一个消息
                    TextMessage textMessage = session.createTextMessage(message);
    
                    return textMessage;
                }
            });
            System.out.println("发送消息" + message);
        }
    }

    再对这个接口写一个实现,在com.easylab.jms.producer创建一个ProducerServiceImplTopic实现类(使用主题模式发送)

    package com.easylab.jms.producer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-spring  </p>
     * @ClassName :  ProducerServiceImpl
     * @date : 2018/6/24 0024
     * @time : 8:42
     * @createTime 2018-06-24 8:42
     * @version : 2.0
     * @description :
     *******************************/
    @Service
    public class ProducerServiceImplTopic implements IProducerService {
    
        @Autowired
        JmsTemplate jmsTemplate;
    
        //指定主题模式
        @Resource(name = "topicDestination")
        Destination destination;
    
        public void sendMessage( final String message) {
            // 使用JmsTemplate发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    // 创建一个消息
                    TextMessage textMessage = session.createTextMessage(message);
    
                    return textMessage;
                }
            });
            System.out.println("发送消息" + message);
        }
    }

    ok,接下在com.easylab.jms.produce写一个AppProducer运行类就行了

    package com.easylab.jms.producer;
    
    import javafx.application.Application;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.ApplicationContext;
    
    import javax.jms.*;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-test  </p>
     * @ClassName :  AppProducer
     * @date : 2018/6/20 0020
     * @time : 20:46
     * @createTime 2018-06-20 20:46
     * @version : 2.0
     * @description :
     *******************************/
    
    public class AppProducer {
    
        public static void main(String[] args) {
    
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("common.xml");
    
            IProducerService service = (IProducerService)context.getBean(ProducerServiceImpl.class);
    
            for (int i = 0; i < 2; i++) {
                service.sendMessage("test" + i);
            }
    
            //切换到主题模式下再发送几条
            service = (IProducerService)context.getBean(ProducerServiceImplTopic.class);
            for (int i = 0; i < 10; i++) {
                service.sendMessage("test" + i);
            }
            context.close();
    
        }
    
    }

    可以运行一下看看,可以看到发送信息,但是没有接受消息,因为我们还没有写消费者,我们可以通过监听器来监听队列,如果队列中有消息,就会直接进入到监听器的onMessage方法中进行我们的业务处理,所以我们可以不用写消费者代码了。用监听器即可。

    在配置文件中,我们写了两个监听容器和监听器,用他们分别监听队列和主题,查看效果。接下来创建两个监听器.

    在com.easylab.jms.produce写一个ConsumerMessageListener监听器:

    package com.easylab.jms.consumer;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-spring  </p>
     * @ClassName :  ConsumerMessageListener
     * @date : 2018/6/24 0024
     * @time : 9:16
     * @createTime 2018-06-24 9:16
     * @version : 2.0
     * @description :
     *
     *
     *
     *******************************/
    
    
    public class ConsumerMessageListener implements MessageListener {
    
        public void onMessage(Message message) {
    
            TextMessage textMessage = (TextMessage) message;
    
            try {
                System.out.println("队列监听器接收消息" + textMessage);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }

    在com.easylab.jms.produce写一个ConsumerMessageListener监听器:

    package com.easylab.jms.consumer;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /******************************
     * @author : liuyuan
     * <p>ProjectName:jms-spring  </p>
     * @ClassName :  ConsumerMessageListener
     * @date : 2018/6/24 0024
     * @time : 9:16
     * @createTime 2018-06-24 9:16
     * @version : 2.0
     * @description :
     *******************************/
    
    public class ConsumerMessageListenerTopic implements MessageListener {
    
        public void onMessage(Message message) {
    
            TextMessage textMessage = (TextMessage) message;
    
            try {
                System.out.println("我是topic监听器,接收到消息:" + textMessage);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }

    到这里代码就全部写完了,运行刚刚创建启动类

    可以清楚的看到消息队列接收到了两条,主题队列接收到了十条,测试成功。

    代码地址:https://github.com/MrLiu1227/ActiveMQ

  • 相关阅读:
    简单对拍
    搜索感想
    L1434滑雪
    记忆化搜索
    L3956棋盘
    USACO 数字三角形
    枚举顺序
    蓝桥计算
    用户态和内核态IO过程
    Mybatis的结果集中的Do要不要有setter
  • 原文地址:https://www.cnblogs.com/liuyuan1227/p/10744278.html
Copyright © 2011-2022 走看看