zoukankan      html  css  js  c++  java
  • Spring整合JMS

    Spring整合JMS(消息中间件)实例

    本篇文章主要描述了如何配置Spring-JMS,至于为何这样配置及Spring-JMS相关介绍,请阅读这篇文章:Spring整合JMS(消息中间件)。我们这里的消息broker用的是ActiveMQ。

    一、相关配置

    本篇主要讲解如何在Spring中配置JMS,关于Spring本身的配置本文就不多做介绍了。

    1.1 配置maven依赖

    在使用Spring-JMS之前,先配置相关依赖。

    <!-- Java JMS 原生API -->
    <dependency>
       <groupId>javax.jms</groupId>
       <artifactId>javax.jms-api</artifactId>
       <version>2.0</version>
    </dependency>
    <!-- spring-jms API -->
    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-jms</artifactId>
       <version>${spring.version}</version>
    </dependency>
    <!-- active-mq核心包 -->
    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
       <version>5.7.0</version>
    </dependency>
    <!-- spring-test类用来测试 -->
    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-test</artifactId>
       <version>${spring.version}</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

     1.2 安装并启动ActiveMQ

    这里只是简单介绍如何安装启动ActiveMQ,详细请参考官方文档。

    1. 下载ActiveMQ:http://activemq.apache.org/download.html
    2. 安装:
      - 解压:tar zxvf activemq-x.x.x-bin.tar.gz
      - 增加权限:
      cd [activemq_install_dir]/bin
      chmod 755 activemq

    3. 启动:
      cd [activemq_install_dir]/bin
      ./activemq start

    4. 检查是否启动成功:
      netstat -nl|grep 61616

    我们还可以通过监控页面查看ActiveMQ运行情况:http://localhost:8161/admin (默认用户名密码都是admin)

    1. 3 配置JMS相关bean

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/context
                           http://www.springframework.org/schema/context/spring-context.xsd
                           http://www.springframework.org/schema/jms
                           http://www.springframework.org/schema/jms/spring-jms.xsd
                           http://activemq.apache.org/schema/core
                           http://activemq.apache.org/schema/core/activemq-core.xsd">
    
    <!-- 配置连接ActiveMQ的ConnectionFactory -->
    <bean id="amqConnectionFactory"
          class="org.apache.activemq.ActiveMQConnectionFactory"
          p:brokerURL="tcp://localhost:61616"/>
    <!--为了提高效率,配置一个连接池-->
    <bean id="cachedConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory"
          p:targetConnectionFactory-ref="amqConnectionFactory"
          p:sessionCacheSize="10"/>
    <!-- 配置broker的destination-->
    <bean id="destination"
          class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="FOO.TEST"/>
    </bean>
    <!-- 配置Spring的JmsTemplate -->
    <bean id="producerTemplate"
          class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="cachedConnectionFactory"
          p:defaultDestination-ref="destination"/>
    </beans>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    从这个配置中可以看到,这里的配置比Spring整合JMS(消息中间件)文章中介绍的多了一个cachedConnectionFactory配置。我们知道创建和销毁连接是非常消耗资源的,为了解决创建销毁连接带来的资源消耗,我们一般会引入连接池(Pool)或者缓存(Cache)。普通的JmsTemplate的ConnectionFactory每次发送消息时都需要建立新的连接,这样效率是非常低的,所以在具体配置时,我们要尽量用到连接池或缓存,所以这里的配置中加了一个cachedConnectionFactory作为缓存。

    二、发送消息

    写一个JmsMessageProducer异步发送消息:

    @Component
    public class JmsMessageProducer {
        private static final Logger logger = LoggerFactory.getLogger(JmsMessageProducer.class);
        @Autowired
        protected JmsTemplate jmsTemplate;
        protected int numberOfMessages = 10;
    
        public void sendMessages() throws JMSException {
            StringBuilder payload = null;
            for (int i = 0; i < numberOfMessages; ++i) {
                payload = new StringBuilder();
                payload.append("Message [").append(i).append("] sent at: ").append(new Date());
                jmsTemplate.convertAndSend(payload.toString());
                logger.info("Sending message number [" + i + "]");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    三、接收消息

    3.1 方式一:写一个JmsMessageConsumer 同步接收消息

    @Component
    public class JmsMessageConsumer {
        private static final Logger logger = LoggerFactory.getLogger(JmsMessageProducer.class);
        @Autowired
        private JmsTemplate template;
        public void receiveMessages() throws JMSException {
            Message message =template.receive();
            TextMessage textMessage =(TextMessage)template.receive();
            logger.info(textMessage.getText());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2 方式二:写一个JmsMessageListener监听器异步接收消息

    @Component
    public class JmsMessageListener implements MessageListener{
        private static final Logger logger = LoggerFactory.getLogger(JmsMessageListener.class);
        public void onMessage(Message message) {
            try {
                TextMessage msg = (TextMessage) message;
                logger.info("Consumed message: " + msg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    写完listener之后还需要在配置文件中配置这个listener:

    <!-- 注入我们写的listener -->
    <bean id="jmsMessageListener" class="com.heaven.spring.jms.JmsMessageListener"/>
    
    <!-- 配置listener到listener-container当中 -->
    <jms:listener-container
            container-type="default"
            connection-factory="cachedConnectionFactory"
            acknowledge="auto">
        <jms:listener destination="FOO.TEST" ref="jmsMessageListener" method="onMessage"/>
    </jms:listener-container>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    四、测试一下

    4.1 测试发送异步消息

    写一个JmsMessageProducerTest测试一下发送:

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring/root-context.xml")
    public class JmsMessageProducerTest {
        @Autowired
        JmsMessageProducer jmsMessageProducer;
        @Test
        public void testSend(){
            try {
                jmsMessageProducer.sendMessages();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    看一下运行结果:

    这里写图片描述

    消息已经异步发送成功,虽然还没有消费者消费,消息就像被成功处理一样。

    4.2 测试同步接收消息

    写一个JmsMessageConsumerTest 测试一下接收:

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring/root-context.xml")
    public class JmsMessageConsumerTest {
        @Autowired
        JmsMessageConsumer jmsMessageConsumer;
        @Test
        public void testSend(){
            try {
                jmsMessageConsumer.receiveMessages();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    看一下运行结果:

    这里写图片描述

    我们可以看到消费者接收到一条消息,这条消息正是上面发送者发送的第一条消息。如果我们再运行一遍则会收到第二条消息。如果所有消息都消费完了,broker中没有消息了,此时JmsMessageConsumerTest进程便会挂起一直等待,直到有新消息产生,大家可以试一下。

    4.3测试一下异步消息接收

    按照上面3.2节写介绍一个JmsMessageListener并将其配置到 listener-container中。此时JmsMessageListener便会实时监听brokerURL="tcp://192.168.134.128:61616"这个端口,一旦有消息产生,便会在onMessage()方法接收到消息。其实此时我们的producer和监听器都在同一个应用中,如果我们再运行producer,可以看到如下结果:
    这里写图片描述

    通过运行结果可以看到,每当生产者生产消息,监听器便会实时接收到消息。

  • 相关阅读:
    Org4的約會的Form_Load
    MSCRM 4 Remove 'Add Existing xxxxx to this record' button
    出售剩余时间
    MSCRM中disabled和Disabled屬性的區別
    一些常用的sql
    約會的客戶變更時取其它表的資料
    MSCRM儲存disabled欄位的值
    CRM显示产品图片
    顯示Object的所有屬性及處理ContactId只列出ParentCustomer的資料的javascript
    浅谈递归过程以及递归的优化
  • 原文地址:https://www.cnblogs.com/jianwei-dai/p/7797682.html
Copyright © 2011-2022 走看看