zoukankan      html  css  js  c++  java
  • Spring整合ActiveMQ

    前面介绍了ActiveMQ的基本安装使用,并写了简单的生产者、消费者。下面主要介绍ActiveMQ的消费者的Listener、Spring整合ActiveMQ。

    一、消费者Listener

      之前创建的消费者,接收消息的时候都是直接使用consumer.receive(),每次消费一条数据,启动一次获取一次,十分的木讷。实际开发工作中,基本不会使用此种方式,一般,消息的消费者都是持续监听目标队列Queue或者主题Topic,主要应用程序不主动关闭,会一直监听消费消息数据。

      JMS listener的消费者:

    package com.cfang.mq.simpleCase;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ConsumerListener {
    
    	public static void main(String[] args) {
    		ConsumerListener consumerListener = new ConsumerListener();
    		consumerListener.listenMessage();
    	}
    	
    	public void listenMessage() {
    		ConnectionFactory factory = null;
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		MessageConsumer consumer = null;
    		try {
    			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://172.31.31.160:61616");
    			connection = factory.createConnection();
    			connection.start();
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			destination = session.createQueue("tp_simple_queue");
    			consumer = session.createConsumer(destination);
    			consumer.setMessageListener(new MessageListener() {
    				public void onMessage(Message message) {
    					try {
    						TextMessage textMessage = (TextMessage) message;
    						System.out.println(textMessage.getText());
    					} catch (JMSException e) {
    						e.printStackTrace();
    					}
    				}
    			});
    			//阻塞代码,模拟应用程序不关闭。如果关闭了,那监听也自动停止了
    			System.in.read();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if(consumer != null){
    				try {
    					consumer.close();
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    			if(session != null){
    				try {
    					session.close();
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    			if(connection != null){
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    

    二、Spring整合

      Spring整合ActiveMQ非常的便捷,Spring提供了JmsTemplate对其进行操作,非常的方便,下面从配置文件到程序代码逐步介绍。

      1、Spring-jms配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xmlns:p="http://www.springframework.org/schema/p"  
        xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
    		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
            http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
    	<context:component-scan base-package="com.cfang.amq">
    		
    	</context:component-scan>
    		
    	<!-- 配置ActiveMQConnectionFactory连接工厂对象 -->
    	<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://172.31.31.160:61616" userName="admin" password="admin"/>
    	<!-- 配置connectionFactory的连接池信息 -->
    	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    		<property name="connectionFactory" ref="amqConnectionFactory"></property>
    		<property name="maxConnections" value="10"></property>
    	</bean>
    	<!-- 带有缓存功能的连接工厂,Session缓存大小可配置 -->
    	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
    		<property name="sessionCacheSize" value="100"></property>
    	</bean>
    	<!-- 配置JmsTemplate -->
    	<bean id="template" class="org.springframework.jms.core.JmsTemplate">
    		<!-- 给定连接工厂, 必须是spring创建的连接工厂. -->
    		<property name="connectionFactory" ref="connectionFactory"></property>
    		<!-- 可选 - 默认目的地命名 -->
    		<property name="defaultDestinationName" value="tp_simple_queue"></property>
    	</bean>
    	<!-- 配置生产者Producer -->
    	<bean id="springProducer" class="com.cfang.amq.SpringProducer"/>
    	
    	<!-- 配置消费listener -->
    	<bean  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="connectionFactory" ref="connectionFactory"></property>
    		<property name="destinationName" value="tp_simple_queue"/>
    		<property name="messageListener" ref="springConsumer"></property>
    		<property name="concurrentConsumers" value="1"/> 
    	</bean>
    	<!-- 消费者 -->
    	<bean id="springConsumer" class="com.cfang.amq.SpringConsumer"/>
    </beans>
    

      2、单元测试

    package com.cfang.prebo.activemq;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import com.cfang.amq.SpringProducer;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:applicationContext-jms.xml" })
    public class SpringListenerTest {
    	
    	@Autowired
    	private SpringProducer springConsumer;
    
    	@Test
    	public void start() throws Exception {
    		System.out.println("======start");
    		
    		//发送消息
    		Scanner scanner = new Scanner(System.in);
    		while(true) {
    			System.out.print("producer send msg : ");
    			String line = scanner.nextLine();
    			if("exit".equals(line)) {
    				break;
    			}
    			springConsumer.sendMsg(null, line);
    		}
    		
    		//阻塞
    //		System.in.read();
    	}
    	
    }
    

      3、运行结果:

      

      

  • 相关阅读:
    tomcat-1
    oscache-2
    oscache-3
    oscache-1
    oscache-4
    缓存概要
    Criterion & DetachedCriteria
    Hibernate <查询缓存>
    Hibernate <二级缓存>
    Hibernate <一级缓存>
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11339927.html
Copyright © 2011-2022 走看看