zoukankan      html  css  js  c++  java
  • ActiveMQ集成Spring使用

      现在任何一个框架的使用都会结合spring框架,quartz、cxf与平时常见的Hibernate、mybatis、Struts等都可以与spring集成起来使用,在这里研究了activemq结合spring的使用方法。

    1.理论篇

      spring集成JMS连接ActiveMq

        ConnectionFactory:用于管理连接的工厂(Spring为我们提供的连接池,因为JmsTemplate每次发消息都会重新创建连接、会话和producer,这个操作非常消耗性能,所以Spring提供了连接池)

          spring提供了两个实现类:SingleConnectionFactory--整个应用使用同一个Connection进行操作,并且重写了其close()方法

                      CachingConnectionFactory--继承自SingleConnectionFactory,所以有SingleConnectionFactory的所有功能,并且额外提供了缓存功能,可以缓存Session、producer、consumer等。

        JmsTemplate:用于接收和发送消息的模板。Spring提供的,我们只需要向spring容器注册这个类就可以使用JmsTemplate方便的操作JMS。JmsTemplate类是线程安全的,可以在整个范围内使用,而且我们可以注册多个JmsTemplate到spring中。

        MessageListener:消息监听器。需要我们手动实现,只需要实现一个omMessage(Message message)方法。

    2.代码实现spring集成activemq

    0.pom.xml配置与目录结构与log4j.properties

    log4j.rootLogger=info,B,A
    
    log4j.appender.A=org.apache.log4j.ConsoleAppender
    log4j.appender.A.layout=org.apache.log4j.PatternLayout
    log4j.appender.A.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n
    
    log4j.appender.B=org.apache.log4j.RollingFileAppender
    log4j.appender.B.File=E:\jms.log
    log4j.appender.B.MaxFileSize=10MB
    log4j.appender.B.MaxBackupIndex=5
    log4j.appender.B.layout=org.apache.log4j.PatternLayout
    log4j.appender.B.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n
    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>cn.qlq</groupId>
        <artifactId>jms-spring</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>war</packaging>
        <name>jms-spring Maven Webapp</name>
        <!-- 版本管理 -->
        <properties>
            <springframework>4.1.8.RELEASE</springframework>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.10</version>
                <scope>test</scope>
            </dependency>
    
    
            <!-- JSP相关 -->
            <dependency>
                <groupId>jstl</groupId>
                <artifactId>jstl</artifactId>
                <version>1.2</version>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
                <scope>provided</scope>
                <version>2.5</version>
            </dependency>
    
    
            <!-- spring -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${springframework}</version>
            </dependency>
            <!-- xbean 如<amq:connectionFactory /> -->
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
    
            <!-- activemq -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.12.1</version>
            </dependency>
    
            <!-- slf4j-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.12</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.5</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <finalName>Crawl-Page</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <port>8080</port>
                        <path>/</path>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

     1.队列模式的MQ实现(Destination设置为ActiveMQQueue即可)

    ApplicationContext-all.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!--配置扫描注解的包-->
        <context:component-scan base-package="cn.qlq.jms" ></context:component-scan>
    
        <!--Spring为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
        <!--Spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"></constructor-arg>
        </bean>
    
        <!--spring JMS提供的JmsTemplate-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
    
        <!--消费消息容器(需要注入连接工厂,目的地,消息消费者监听器)-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="myConsumer"/>
        </bean>
    </beans>

     生产者接口和实现类:

    package cn.qlq.jms.producer;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:25 2018/9/27
     */
    public interface MyProducer {
        void sendMessage(String message);
    }
    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:26 2018/9/27
     */
    
    @Service
    public class MyProducerImpl implements MyProducer {
        private static final Logger logger = LoggerFactory.getLogger(MyProducerImpl.class);
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        @Qualifier("queueDestination")
        private Destination destination;
    
        public void sendMessage(final String message) {
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
            logger.info("send textMesage:{}",message);
        }
    }

    消费消息监听器代码:

    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 22:36 2018/9/27
     */
    @Service
    public class MyConsumer implements MessageListener {
        private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);
    
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("consumer message:"+ textMessage.getText());
            } catch (JMSException e) {
                logger.error("consume message error", e);
            }
        }
    }

    生产者启动类:启动spring容器,获取bean并且发布消息:

    package cn.qlq.jms.producer;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    import org.springframework.context.ApplicationContext;
    
    /**
     * @Author: qlq
     * @Description 生产消息的入口类
     * @Date: 22:15 2018/9/27
     */
    public class ProducerApp {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
    //        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
            MyProducer myProducer = (MyProducer) applicationContext.getBean("myProducerImpl");
            for (int i = 0; i < 100; i++) {
                myProducer.sendMessage(" message"+i);
            }
            applicationContext.close();//关闭容器,会自动关闭所有的资源
        }
    }

    消费者启动类:启动spring容器,会自动监听目的地的消息:

    package cn.qlq.jms.producer;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    
    /**
     * @Author: qlq
     * @Description 消费消息的入口类
     * @Date: 22:15 2018/9/27
     */
    public class ConsumerApp {
        public static void main(String[] args) {
            //启动容器会自动消费消息
            ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
        }
    }

     启动三个消费者一个生产者,查看控制台结果:(符合队列模式的特点:平均消费)

     

    activemq后台查看结果:(已经停掉3个消费者线程所以消费者是0)

     2.主题模式的MQ实现(Destination设置为ActiveMQQueue即可)

     在Spring的xml配置文件中增加topic目的地,并且将消息监听的目的地设为topicDestination

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!--配置扫描注解的包-->
        <context:component-scan base-package="cn.qlq.jms" ></context:component-scan>
    
        <!--Spring为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
        <!--Spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"></constructor-arg>
        </bean>
    
        <!--spring JMS提供的JmsTemplate-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
    
        <!--消费消息容器(需要注入连接工厂,目的地,消息消费者监听器)-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="topicDestination"/>
            <property name="messageListener" ref="myConsumer"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic"></constructor-arg>
        </bean>
    </beans>

     消息生产者将目的地注入为topicDestination

    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:26 2018/9/27
     */
    
    @Service
    public class MyProducerImpl implements MyProducer {
        private static final Logger logger = LoggerFactory.getLogger(MyProducerImpl.class);
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        @Qualifier("topicDestination")
        private Destination destination;
    
        public void sendMessage(final String message) {
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
            logger.info("send textMesage:{}",message);
        }
    }

       剩下的测试代码同上,还是先开启三个消费者,再开启一个生产者,查看结果:(符合订阅模式特点:消费所有已经订阅的消息)

    activemq后台查看结果:

  • 相关阅读:
    布局重用 include merge ViewStub
    AS 常用插件 MD
    AS 2.0新功能 Instant Run
    AS .ignore插件 忽略文件
    AS Gradle构建工具与Android plugin插件【大全】
    如何开通www国际域名个人网站
    倒计时实现方案总结 Timer Handler
    AS 进行单元测试
    RxJava 设计理念 观察者模式 Observable lambdas MD
    retrofit okhttp RxJava bk Gson Lambda 综合示例【配置】
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/9710633.html
Copyright © 2011-2022 走看看