zoukankan      html  css  js  c++  java
  • 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

    前言

    这篇博文,我们基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

    环境准备

    工具

    1. JDK1.6或1.7

    2. Spring4.1.0

    3. ActiveMQ5.11.1

    4. Tomcat7.x

    目录结构

    这里写图片描述

    所需jar包

    这里写图片描述

    项目的配置

    配置ConnectionFactory

    connectionFactory是Spring用于创建到JMS服务器链接的,Spring提供了多种connectionFactory,我们介绍两个SingleConnectionFactory和CachingConnectionFactory。

    SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。

    CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。我们使用CachingConnectionFactory来作为示例。

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        </bean>
    • 1
    • 2
    • 1
    • 2

    Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。所以定义一个ConnectionFactory的完整代码应该如下所示:

    <!-- ActiveMQ 连接工厂 -->
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin"  />
    
        <!-- Spring Caching连接工厂 -->
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <!-- 同上,同理 -->
            <!-- <constructor-arg ref="amqConnectionFactory" /> -->
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
        </bean>
    

      

    配置生产者

    配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器。但是我们要怎么进行消息发送呢?通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要注入一个Spring提供的ConnectionFactory对象。

    在利用JmsTemplate进行消息发送的时候,我们需要知道发送哪种消息类型:一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。如下所示:

    <!-- Spring JmsTemplate 的消息生产者 start-->
    
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true" />
        </bean>
    
        <!--Spring JmsTemplate 的消息生产者 end-->
    

      

    生产者如何指定目的地和发送消息?大家看源码即可,就不再这提供了。

    配置消费者

    生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,通过配置MessageListenerContainer的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定:一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。

    <!-- 消息消费者 start-->
    
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.queue" ref="queueReceiver1"/>
            <jms:listener destination="test.queue" ref="queueReceiver2"/>
        </jms:listener-container>
    
        <!-- 定义Topic监听器 -->
        <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.topic" ref="topicReceiver1"/>
            <jms:listener destination="test.topic" ref="topicReceiver2"/>
        </jms:listener-container>
    
        <!-- 消息消费者 end -->
    

      

    ActiveMQ.xml

    此时,Spring和JMS,ActiveMQ整合的ActiveMQ.xml已经完成,下面展示所有的xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
        <!-- ActiveMQ 连接工厂 -->
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin"  />
    
        <!-- Spring Caching连接工厂 -->
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <!-- 同上,同理 -->
            <!-- <constructor-arg ref="amqConnectionFactory" /> -->
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- Spring JmsTemplate 的消息生产者 start-->
    
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true" />
        </bean>
    
        <!--Spring JmsTemplate 的消息生产者 end-->
    
    
        <!-- 消息消费者 start-->
    
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.queue" ref="queueReceiver1"/>
            <jms:listener destination="test.queue" ref="queueReceiver2"/>
        </jms:listener-container>
    
        <!-- 定义Topic监听器 -->
        <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.topic" ref="topicReceiver1"/>
            <jms:listener destination="test.topic" ref="topicReceiver2"/>
        </jms:listener-container>
    
        <!-- 消息消费者 end -->
    </beans>  
    

      鉴于博文内容较多,我们只是在粘贴web.xml的配置,就不在博文中提供Spring和SpringMVC的XML配置,其他内容,大家查看源码即可。

    web.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
        version="3.0">
        <display-name>ActiveMQSpringDemo</display-name>
    
        <!-- Log4J Start -->
        <context-param>
            <param-name>log4jConfigLocation</param-name>
            <param-value>classpath:log4j.properties</param-value>
        </context-param>
        <context-param>
            <param-name>log4jRefreshInterval</param-name>
            <param-value>6000</param-value>
        </context-param>
        <!-- Spring Log4J config -->
        <listener>
            <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
        </listener>
        <!-- Log4J End -->
    
        <!-- Spring 编码过滤器 start -->
        <filter>
            <filter-name>characterEncoding</filter-name>
            <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
            <init-param>
                <param-name>encoding</param-name>
                <param-value>UTF-8</param-value>
            </init-param>
            <init-param>
                <param-name>forceEncoding</param-name>
                <param-value>true</param-value>
            </init-param>
        </filter>
        <filter-mapping>
            <filter-name>characterEncoding</filter-name>
            <url-pattern>/*</url-pattern>
        </filter-mapping>
        <!-- Spring 编码过滤器 End -->
    
        <!-- Spring Application Context Listener Start -->
        <context-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath*:applicationContext.xml,classpath*:ActiveMQ.xml</param-value>
        </context-param>
        <listener>
            <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
        </listener>
        <!-- Spring Application Context Listener End -->
    
    
        <!-- Spring MVC Config Start -->
        <servlet>
            <servlet-name>SpringMVC</servlet-name>
            <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    
            <init-param>
                <param-name>contextConfigLocation</param-name>
                <param-value>classpath:spring-mvc.xml</param-value>
            </init-param>
            <load-on-startup>1</load-on-startup>
        </servlet>
        <servlet-mapping>
            <servlet-name>SpringMVC</servlet-name>
            <!-- Filter all resources -->
            <url-pattern>/</url-pattern>
        </servlet-mapping>
        <!-- Spring MVC Config End -->
    
    </web-app>
    

      

    运行效果

    这里写图片描述

    这里写图片描述

    从上图可以看出队列模型和PUB/SUB模型的区别,Queue只能由一个消费者接收,其他Queue中的成员无法接受到被已消费的信息,而Topic则可以,只要是订阅了Topic的消费者,全部可以获取到生产者发布的信息。

    总结

    Spring提供了对JMS的支持,ActiveMQ提供了很好的实现,而此时我们已经将两者完美的结合在了一起。

  • 相关阅读:
    开源项目
    [Accessibility] Missing contentDescription attribute on image [可取行]失踪contentDescription属性图像
    Android 布局 中实现适应屏幕大小及组件滚动
    EF 错误记录
    EasyUI 加载时需要显示和隐藏 panel(面板)内容破版问题
    IE 报表缩放后页面破版
    VS 2017 引入nuget 问题
    SSRS 报表显示页面 asp net session丢失或者找不到 asp net session has expired or could not be found()
    log4net 配置
    网站
  • 原文地址:https://www.cnblogs.com/GotoJava/p/6881975.html
Copyright © 2011-2022 走看看