zoukankan      html  css  js  c++  java
  • spring 整合 ActiveMQ

    1.1 JMS简介

    JMS即Java Message Service,Java消息服务。主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。

    点对点:一个生产者对应一个消费者;

    发布/订阅模式,一个生产者对应多个消费者。

    1.2 Spring整合JMS

    maven:

    <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.10</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring-version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${spring-version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring-version}</version>
            </dependency>
            <dependency>
                <groupId>javax.annotation</groupId>
                <artifactId>jsr250-api</artifactId>
                <version>1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
    </dependencies> 

    1.2.1 activeMQ准备

    下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录activemq.bat启动activeMQ。

    1.2.2 配置ConnectionFactory

    ConnectionFactory:SingleConnectionFactory和CachingConnectionFactory。

    SingleConnectionFactory:单连接,忽略Connection的close方法调用。

    CachingConnectionFactory:继承SingleConnectionFactory,新增缓存功能,可以缓存Session、MessageProducer和MessageConsumer。

    1 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>

    配置:

     1     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
     2     <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     3         <property name="brokerURL" value="tcp://localhost:61616"/>
     4     </bean>
     5     
     6     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
     7     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
     8         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
     9         <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
    10     </bean>

    当使用PooledConnectionFactory连接池:

     1     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
     2     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     3         <property name="brokerURL" value="tcp://localhost:61616"/>
     4     </bean>
     5     
     6     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
     7         <property name="connectionFactory" ref="targetConnectionFactory"/>
     8         <property name="maxConnections" value="10"/>
     9     </bean>
    10     
    11     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    12         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    13     </bean>

    1.2.3 配置生产者

    1     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    2     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    3         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    4         <property name="connectionFactory" ref="connectionFactory"/>
    5     </bean>

    队列:

     1     <!--这个是队列目的地,点对点的-->
     2     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
     3         <constructor-arg>
     4             <value>queue</value>
     5         </constructor-arg>
     6     </bean>
     7     <!--这个是主题目的地,一对多的-->
     8     <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
     9         <constructor-arg value="topic"/>
    10     </bean>

    生产者:

     1 package com.tiantian.springintejms.service.impl;
     2  
     3 import javax.annotation.Resource;
     4 import javax.jms.Destination;
     5 import javax.jms.JMSException;
     6 import javax.jms.Message;
     7 import javax.jms.Session;
     8  
     9 import org.springframework.jms.core.JmsTemplate;
    10 import org.springframework.jms.core.MessageCreator;
    11 import org.springframework.stereotype.Component;
    12  
    13 import com.tiantian.springintejms.service.ProducerService;
    14  
    15 @Component
    16 public class ProducerServiceImpl implements ProducerService {
    17  
    18     private JmsTemplate jmsTemplate;
    19     
    20     public void sendMessage(Destination destination, final String message) {
    21         System.out.println("---------------生产者发送消息-----------------");
    22         System.out.println("---------------生产者发了一个消息:" + message);
    23         jmsTemplate.send(destination, new MessageCreator() {
    24             public Message createMessage(Session session) throws JMSException {
    25                 return session.createTextMessage(message);
    26             }
    27         });
    28     } 
    29 
    30     public JmsTemplate getJmsTemplate() {
    31         returnjmsTemplate;
    32     } 
    33 
    34     @Resource
    35     public void setJmsTemplate(JmsTemplate jmsTemplate) {
    36         this.jmsTemplate = jmsTemplate;
    37     }
    38  
    39 }

    1.2.4 配置消费者

    定义处理消息的MessageListener:实现JMS规范中的MessageListener接口

     1 package com.tiantian.springintejms.listener;
     2  
     3 import javax.jms.JMSException;
     4 import javax.jms.Message;
     5 import javax.jms.MessageListener;
     6 import javax.jms.TextMessage;
     7  
     8 public class ConsumerMessageListener implements MessageListener {
     9  
    10     public void onMessage(Message message) {
    11         //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
    12         TextMessage textMsg = (TextMessage) message;
    13         System.out.println("接收到一个纯文本消息。");
    14         try {
    15             System.out.println("消息内容是:" + textMsg.getText());
    16         } catch (JMSException e) {
    17             e.printStackTrace();
    18         }
    19     }
    20  
    21 }

    消息监听容器配置:

     1     <!--这个是队列目的地-->
     2     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
     3         <constructor-arg>
     4             <value>queue</value>
     5         </constructor-arg>
     6     </bean>
     7     <!-- 消息监听器 -->
     8     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>    
     9 
    10     <!-- 消息监听容器 -->
    11     <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    12         <property name="connectionFactory" ref="connectionFactory" />
    13         <property name="destination" ref="queueDestination" />
    14         <property name="messageListener" ref="consumerMessageListener" />
    15     </bean>

    完整配置:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:jms="http://www.springframework.org/schema/jms"
     5     xsi:schemaLocation="http://www.springframework.org/schema/beans
     6      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     7      http://www.springframework.org/schema/context
     8      http://www.springframework.org/schema/context/spring-context-3.0.xsd
     9     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    10     http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
    11  
    12     <context:component-scan base-package="com.tiantian" />
    13  
    14     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    15     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    16         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    17         <property name="connectionFactory" ref="connectionFactory"/>
    18     </bean>
    19     
    20     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    21     <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    22         <property name="brokerURL" value="tcp://localhost:61616"/>
    23     </bean>
    24     
    25     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    26     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    27         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    28         <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
    29     </bean>
    30     
    31     <!--这个是队列目的地-->
    32     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    33         <constructor-arg>
    34             <value>queue</value>
    35         </constructor-arg>
    36     </bean>
    37     <!-- 消息监听器 -->
    38     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
    39     <!-- 消息监听容器 -->
    40     <bean id="jmsContainer"
    41         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    42         <property name="connectionFactory" ref="connectionFactory" />
    43         <property name="destination" ref="queueDestination" />
    44         <property name="messageListener" ref="consumerMessageListener" />
    45     </bean>
    46 </beans>

    测试:

     1 package com.tiantian.springintejms.test;
     2  
     3 import javax.jms.Destination;
     4  
     5 import org.junit.Test;
     6 import org.junit.runner.RunWith;
     7 import org.springframework.beans.factory.annotation.Autowired;
     8 import org.springframework.beans.factory.annotation.Qualifier;
     9 import org.springframework.test.context.ContextConfiguration;
    10 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    11 import com.tiantian.springintejms.service.ProducerService;
    12  
    13 @RunWith(SpringJUnit4ClassRunner.class)
    14 @ContextConfiguration("/applicationContext.xml")
    15 public class ProducerConsumerTest {
    16  
    17     @Autowired
    18     private ProducerService producerService;
    19     @Autowired
    20     @Qualifier("queueDestination")
    21     private Destination destination;
    22     
    23     @Test
    24     public void testSend() {
    25         for (int i=0; i<2; i++) {
    26             producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));
    27         }
    28     }
    29     
    30 }
  • 相关阅读:
    ubuntu nginx 伪静态 设置
    ubuntu thinkphp pathinfo 404等问题
    ubuntu svn安装测试
    ubuntu zendDebugger.so 加载不上的问题
    ubuntu ssh安装
    nyoj-660-逃离地球
    nyoj-643-发短信
    nyoj-181-小明的难题
    nyoj-663-弟弟的作业
    nyoj-682-初学者的烦恼
  • 原文地址:https://www.cnblogs.com/niejunlei/p/5279817.html
Copyright © 2011-2022 走看看