zoukankan      html  css  js  c++  java
  • ActiveMQ集成Spring使用

      现在任何一个框架的使用都会结合spring框架,quartz、cxf与平时常见的Hibernate、mybatis、Struts等都可以与spring集成起来使用,在这里研究了activemq结合spring的使用方法。

    1.理论篇

      spring集成JMS连接ActiveMq

        ConnectionFactory:用于管理连接的工厂(Spring为我们提供的连接池,因为JmsTemplate每次发消息都会重新创建连接、会话和producer,这个操作非常消耗性能,所以Spring提供了连接池)

          spring提供了两个实现类:SingleConnectionFactory--整个应用使用同一个Connection进行操作,并且重写了其close()方法

                      CachingConnectionFactory--继承自SingleConnectionFactory,所以有SingleConnectionFactory的所有功能,并且额外提供了缓存功能,可以缓存Session、producer、consumer等。

        JmsTemplate:用于接收和发送消息的模板。Spring提供的,我们只需要向spring容器注册这个类就可以使用JmsTemplate方便的操作JMS。JmsTemplate类是线程安全的,可以在整个范围内使用,而且我们可以注册多个JmsTemplate到spring中。

        MessageListener:消息监听器。需要我们手动实现,只需要实现一个omMessage(Message message)方法。

    2.代码实现spring集成activemq

    0.pom.xml配置与目录结构与log4j.properties

    log4j.rootLogger=info,B,A
    
    log4j.appender.A=org.apache.log4j.ConsoleAppender
    log4j.appender.A.layout=org.apache.log4j.PatternLayout
    log4j.appender.A.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n
    
    log4j.appender.B=org.apache.log4j.RollingFileAppender
    log4j.appender.B.File=E:\jms.log
    log4j.appender.B.MaxFileSize=10MB
    log4j.appender.B.MaxBackupIndex=5
    log4j.appender.B.layout=org.apache.log4j.PatternLayout
    log4j.appender.B.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n
    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>cn.qlq</groupId>
        <artifactId>jms-spring</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>war</packaging>
        <name>jms-spring Maven Webapp</name>
        <!-- 版本管理 -->
        <properties>
            <springframework>4.1.8.RELEASE</springframework>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.10</version>
                <scope>test</scope>
            </dependency>
    
    
            <!-- JSP相关 -->
            <dependency>
                <groupId>jstl</groupId>
                <artifactId>jstl</artifactId>
                <version>1.2</version>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
                <scope>provided</scope>
                <version>2.5</version>
            </dependency>
    
    
            <!-- spring -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${springframework}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${springframework}</version>
            </dependency>
            <!-- xbean 如<amq:connectionFactory /> -->
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
    
            <!-- activemq -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.12.1</version>
            </dependency>
    
            <!-- slf4j-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.12</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.5</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <finalName>Crawl-Page</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <port>8080</port>
                        <path>/</path>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

     1.队列模式的MQ实现(Destination设置为ActiveMQQueue即可)

    ApplicationContext-all.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!--配置扫描注解的包-->
        <context:component-scan base-package="cn.qlq.jms" ></context:component-scan>
    
        <!--Spring为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
        <!--Spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"></constructor-arg>
        </bean>
    
        <!--spring JMS提供的JmsTemplate-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
    
        <!--消费消息容器(需要注入连接工厂,目的地,消息消费者监听器)-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="myConsumer"/>
        </bean>
    </beans>

     生产者接口和实现类:

    package cn.qlq.jms.producer;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:25 2018/9/27
     */
    public interface MyProducer {
        void sendMessage(String message);
    }
    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:26 2018/9/27
     */
    
    @Service
    public class MyProducerImpl implements MyProducer {
        private static final Logger logger = LoggerFactory.getLogger(MyProducerImpl.class);
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        @Qualifier("queueDestination")
        private Destination destination;
    
        public void sendMessage(final String message) {
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
            logger.info("send textMesage:{}",message);
        }
    }

    消费消息监听器代码:

    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 22:36 2018/9/27
     */
    @Service
    public class MyConsumer implements MessageListener {
        private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);
    
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("consumer message:"+ textMessage.getText());
            } catch (JMSException e) {
                logger.error("consume message error", e);
            }
        }
    }

    生产者启动类:启动spring容器,获取bean并且发布消息:

    package cn.qlq.jms.producer;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    import org.springframework.context.ApplicationContext;
    
    /**
     * @Author: qlq
     * @Description 生产消息的入口类
     * @Date: 22:15 2018/9/27
     */
    public class ProducerApp {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
    //        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
            MyProducer myProducer = (MyProducer) applicationContext.getBean("myProducerImpl");
            for (int i = 0; i < 100; i++) {
                myProducer.sendMessage(" message"+i);
            }
            applicationContext.close();//关闭容器,会自动关闭所有的资源
        }
    }

    消费者启动类:启动spring容器,会自动监听目的地的消息:

    package cn.qlq.jms.producer;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    
    /**
     * @Author: qlq
     * @Description 消费消息的入口类
     * @Date: 22:15 2018/9/27
     */
    public class ConsumerApp {
        public static void main(String[] args) {
            //启动容器会自动消费消息
            ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("ApplicationContext-all.xml");
        }
    }

     启动三个消费者一个生产者,查看控制台结果:(符合队列模式的特点:平均消费)

     

    activemq后台查看结果:(已经停掉3个消费者线程所以消费者是0)

     2.主题模式的MQ实现(Destination设置为ActiveMQQueue即可)

     在Spring的xml配置文件中增加topic目的地,并且将消息监听的目的地设为topicDestination

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!--配置扫描注解的包-->
        <context:component-scan base-package="cn.qlq.jms" ></context:component-scan>
    
        <!--Spring为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
        <!--Spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"></constructor-arg>
        </bean>
    
        <!--spring JMS提供的JmsTemplate-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
    
        <!--消费消息容器(需要注入连接工厂,目的地,消息消费者监听器)-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="topicDestination"/>
            <property name="messageListener" ref="myConsumer"/>
        </bean>
    
        <!--一个队列的目的地,构造方法指定queueName,点对点模式-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic"></constructor-arg>
        </bean>
    </beans>

     消息生产者将目的地注入为topicDestination

    package cn.qlq.jms.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.*;
    
    /**
     * @Author: qlq
     * @Description
     * @Date: 21:26 2018/9/27
     */
    
    @Service
    public class MyProducerImpl implements MyProducer {
        private static final Logger logger = LoggerFactory.getLogger(MyProducerImpl.class);
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        @Qualifier("topicDestination")
        private Destination destination;
    
        public void sendMessage(final String message) {
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
            logger.info("send textMesage:{}",message);
        }
    }

       剩下的测试代码同上,还是先开启三个消费者,再开启一个生产者,查看结果:(符合订阅模式特点:消费所有已经订阅的消息)

    activemq后台查看结果:

  • 相关阅读:
    取2个日期间的天数
    C#代码与JAVASCRIPT函数的相互调用
    ASP.NET验证码(3种)
    VS2008自带SQL 2005如何使用
    文本框默认有一个值,然后鼠标点上去那个值就清空
    远程桌面连接会话超时或者被限制改组策略也没用的时候就这么解决
    关于CComboBox的使用,编辑项的文字
    vc 剪切板 unicode
    Linux 防火墙、SELinux 的开启和关闭
    MSSQLSERVER服务不能启动
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/9710633.html
Copyright © 2011-2022 走看看