zoukankan      html  css  js  c++  java
  • 【ActiveMQ】Spring Jms集成ActiveMQ学习记录

    Spring Jms集成ActiveMQ学习记录。

    引入依赖包

    无论生产者还是消费者均引入这些包:

    <properties>
        <spring.version>3.0.5.RELEASE</spring.version>
    </properties>
    
    <dependencies>
           <dependency>  
               <groupId>org.springframework</groupId>  
               <artifactId>spring-jms</artifactId>  
               <version>${spring.version}</version>  
           </dependency>  
           <dependency>  
               <groupId>org.apache.activemq</groupId>  
               <artifactId>activemq-core</artifactId>  
               <version>5.7.0</version>  
           </dependency>
    </dependencies>
    

    生产者

    先注册连接工厂、QueueTemplate等Bean:

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:jms="http://www.springframework.org/schema/jms"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd   
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
        
        <context:component-scan base-package="com.nicchagil" />    
        
        <!-- ActiveMQ的连接工厂 -->
        <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.1.101:61616"/>  
        </bean>
    
        <!-- Spring的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- 队列模式 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <constructor-arg ref="connectionFactory" />
            <!-- 非发布/订阅模型,即队列模式 -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 发布/订阅模式 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
            <constructor-arg ref="connectionFactory" />
            <!-- 发布/订阅模式 -->
            <property name="pubSubDomain" value="true" />
        </bean>
        
        <!-- 队列 -->
        <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 队列名 -->
            <constructor-arg>
                <value>testQueue</value>
            </constructor-arg>
        </bean>
    
    </beans>
    

    此类完全模拟正常的Service

    package com.nicchagil;
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ProducerService {
        
    	@Resource(name="jmsQueueTemplate")
    	private JmsTemplate jmsTemplate;
    	
        public void sendMessage(Destination destination, final String message) {  
            jmsTemplate.send(destination, new MessageCreator() {  
                public Message createMessage(Session session) throws JMSException {  
                    return session.createTextMessage(message);  
                }  
            });  
        }   
    
    }
    

    这里模拟调用Service去发送一条消息:

    package com.nicchagil;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import com.nicchagil.ProducerService;
    
    
    public class HowToUse {
    
        public static void main(String[] args) {
    		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-activemq.xml"});
    		context.start();
    
    		ProducerService producerService = (ProducerService)context.getBean("producerService");
    		ActiveMQQueue activeMQQueue = (ActiveMQQueue)context.getBean("testQueue");
    		producerService.sendMessage(activeMQQueue, "hello.");
    	}
    
    }
    

    消费者

    注册连接工厂、监听器等Bean:

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:jms="http://www.springframework.org/schema/jms"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd   
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
        
        <context:component-scan base-package="com.nicchagil" />    
        
        <!-- ActiveMQ的连接工厂 -->
        <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.1.101:61616"/>  
        </bean>  
    
        <!-- Spring的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
        </bean>
        
        <!-- 队列 -->
        <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 队列名 -->
            <constructor-arg>
                <value>testQueue</value>
            </constructor-arg>
        </bean>
    
        <!-- 监听器 -->
        <bean id="queueMessageListener" class="com.nicchagil.QueueMessageListener" />
        
        <!-- 消息监听容器 -->
        <bean id="queueListenerContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="testQueue" />
            <property name="messageListener" ref="queueMessageListener" />
        </bean>
    
    </beans>
    

    消费者的主要业务逻辑,这里只简单地打印消息:

    package com.nicchagil;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class QueueMessageListener implements MessageListener {
        
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("处理消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    消费者启动类:

    package com.nicchagil;
    
    import java.io.IOException;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    
    public class Boot {
    
        public static void main(String[] args) throws IOException {
    		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-activemq.xml"});
    		context.start();
    		
    		System.in.read();
    	}
    
    }
    

    运行Boot和HowToUse可看效果。

  • 相关阅读:
    企业级Nginx负载均衡与keepalived高可用实战(一)Nginx篇
    Elasticsearch由浅入深(十一)内核原理
    Elasticsearch由浅入深(十一)索引管理
    Elasticsearch由浅入深(十)搜索引擎:相关度评分 TF&IDF算法、doc value正排索引、解密query、fetch phrase原理、Bouncing Results问题、基于scoll技术滚动搜索大量数据
    Elasticsearch由浅入深(九)搜索引擎:query DSL、filter与query、query搜索实战
    Elasticsearch由浅入深(八)搜索引擎:mapping、精确匹配与全文搜索、分词器、mapping总结
    Elasticsearch由浅入深(七)搜索引擎:_search含义、_multi-index搜索模式、分页搜索以及深分页性能问题、query string search语法以及_all metadata原理
    Elasticsearch由浅入深(六)批量操作:mget批量查询、bulk批量增删改、路由原理、增删改内部原理、document查询内部原理、bulk api的奇特json格式
    Elasticsearch由浅入深(五)_version乐观锁、external version乐观锁、partial update、groovy脚本实现partial update
    Elasticsearch由浅入深(四)ES并发冲突、悲观锁与乐观锁、_version乐观锁并发
  • 原文地址:https://www.cnblogs.com/nick-huang/p/6691310.html
Copyright © 2011-2022 走看看