zoukankan      html  css  js  c++  java
  • Spring 之 JMS 监听JMS消息

        在调用JMS消息消费者的receive()方法接收消息时,调用线程在消息可用之前一直阻塞。该线程出了等待还是等待,无所事事。这样的消息接收是同步消息接收,因为只用等到消息到达才能接收线程的工作。

         有同步的消息接收就有异步的消息接收,异步的消息接收就是注册一个消息监听器,该消息监听器必须实现javax.jms.MessageListener接口,当消息到达时将调用onMessage()方法,以消息作为方法的参数。

        原生的接口是javax.jms.MessageListener,除了这个原生的接口外,Spring 还提供了 SessionAwareMessageListener和MessageListenerAdapter.

    /**
     *
     * @author zhangwei_david
     * @version $Id: MessageListener.java, v 0.1 2015年1月31日 下午9:06:02 zhangwei_david Exp $
     */
    public class MailMessageListener implements MessageListener {
    
        private MessageConverter messageConverter;
    
        /**
         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
         */
        public void onMessage(Message msg) {
            try {
                System.out.println("on message:" + messageConverter.fromMessage(msg));
            } catch (JMSException e) {
            }
        }
    
        /**
         * Setter method for property <tt>messageConverter</tt>.
         *
         * @param messageConverter value to be assigned to property messageConverter
         */
        public void setMessageConverter(MessageConverter messageConverter) {
            this.messageConverter = messageConverter;
        }
    
    }

         Spring 中提供了多种消息监听器的容器,常用的容器有SimpleMessageListenerContainer 和DefaultMessageListenerContainer。SimpleMessageListenerContainer 是一个最简单的容器,不提供事务的支持,DefaultMessageListenerContainer是默认的容器实现,支持事务。

    /**
     *
     * @author zhangwei_david
     * @version $Id: ToStringBase.java, v 0.1 2015年2月2日 下午7:41:52 zhangwei_david Exp $
     */
    public class ToStringBase {
    
        /**
         * @see java.lang.Object#toString()
         */
        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }
    
    }

    定义一个基本的对象

    /**
     *
     * @author zhangwei_david
     * @version $Id: Mail.java, v 0.1 2015年2月2日 下午7:25:24 zhangwei_david Exp $
     */
    public class Mail extends ToStringBase {
    
        /**id**/
        private String mailId;
    
        private String from;
    
        private String to;
    
        private String content;
    
        /**
         * Getter method for property <tt>mailId</tt>.
         *
         * @return property value of mailId
         */
        public String getMailId() {
            return mailId;
        }
    
        /**
         * Setter method for property <tt>mailId</tt>.
         *
         * @param mailId value to be assigned to property mailId
         */
        public void setMailId(String mailId) {
            this.mailId = mailId;
        }
    
        /**
         * Getter method for property <tt>from</tt>.
         *
         * @return property value of from
         */
        public String getFrom() {
            return from;
        }
    
        /**
         * Setter method for property <tt>from</tt>.
         *
         * @param from value to be assigned to property from
         */
        public void setFrom(String from) {
            this.from = from;
        }
    
        /**
         * Getter method for property <tt>to</tt>.
         *
         * @return property value of to
         */
        public String getTo() {
            return to;
        }
    
        /**
         * Setter method for property <tt>to</tt>.
         *
         * @param to value to be assigned to property to
         */
        public void setTo(String to) {
            this.to = to;
        }
    
        /**
         * Getter method for property <tt>content</tt>.
         *
         * @return property value of content
         */
        public String getContent() {
            return content;
        }
    
        /**
         * Setter method for property <tt>content</tt>.
         *
         * @param content value to be assigned to property content
         */
        public void setContent(String content) {
            this.content = content;
        }
    
    }
    
    /**
     *
     * @author zhangwei_david
     * @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
     */
    @Component
    public class ProducerImpl implements Producer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        /**
         */
        @Transactional
        public void send(Mail mail) {
            System.out.println("sende->" + mail);
            jmsTemplate.convertAndSend(mail);
    
        }
    }
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:jms="http://www.springframework.org/schema/jms"
    	xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:aop="http://www.springframework.org/schema/aop" 
    	xsi:schemaLocation="
    		http://www.springframework.org/schema/beans
    		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    		http://www.springframework.org/shcema/jms
    		http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
    		http://www.springframework.org/schema/context
    		http://www.springframework.org/schema/context/spring-context-3.0.xsd
    		http://www.springframework.org/schema/aop 
    		http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
    		">
    	<aop:aspectj-autoproxy />
    
    	<context:annotation-config />
    	<context:component-scan base-package="com.cathy.demo.jms.*" />
    	<!-- connectionFactory -->
    	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL" value="tcp://localhost:61616"/>
    	</bean>
    	<!-- mailMessage converter -->
    	<bean id="mailMessageConverter" class="com.cathy.demo.jms.convert.MailMessageConverter"/>
    	<!-- jmsTemplate -->
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="connectionFactory"/>
    		<property name="defaultDestination" ref="topic"/>
    		<property name="receiveTimeout" value="60000"/>
    		<property name="pubSubDomain" value="true"/>
    		<property name="sessionTransacted" value="true"/>   
    		<property name="messageConverter" ref="mailMessageConverter"/>
    	</bean>
    	<!--
    	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg value="message.queue"/>
    	</bean>
    	-->
    	<!-- 主题 -->
    	<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
    		<constructor-arg value="notifyTopic"/>
    	</bean>
    	<bean id="defaultMessageListener" class="com.cathy.demo.jms.listener.MailMessageListener">
    		<property name="messageConverter" ref="mailMessageConverter"/>
    	</bean>
    	<!-- 消息接收监听器用于异步接收消息-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactory"/>  
            <property name="destination" ref="topic"/>
            <property name="sessionTransacted" value="true"/>
            <property name="messageListener" ref="defaultMessageListener"/>  
       </bean>
       
    </beans>
    /**
     *
     * @author zhangwei_david
     * @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
    public class SenderAndReciver {
        @Autowired
        private Producer producer;
    
        @Test
        public void testSend() {
            Mail mail = new Mail();
            mail.setMailId("testId");
            mail.setTo("david");
            mail.setFrom("cathy");
            mail.setContent("Hello");
            producer.send(mail);
        }
    
    }
    

     测试的结果是:

    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
    on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]
    

     

    MessageListenerAdapter

    消息监听器适配器代理消息的处理目标 通过反射监听方法,具有灵活的消息类型转换。允许监听器的方法来对邮件内容类型进行操作,完全独立于JMS API

    定义一个消息处理目标方法

    /**
     *
     * @author zhangwei_david
     * @version $Id: ListenerDelegate.java, v 0.1 2015年2月3日 下午2:32:11 zhangwei_david Exp $
     */
    public class ListenerDelegate {
    
        public void handleMessage(@SuppressWarnings("rawtypes") Map map) {
            System.out.println("receive  MapMessage->" + map);
        }
    }
    
     
    <bean id="defaultListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
             <constructor-arg ref="listenerDelegate"/>
         </bean>
    	
    	<!-- 消息接收监听器用于异步接收消息-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactory"/>  
            <property name="destination" ref="topic"/>
            <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
            <property name="messageListener" ref="defaultListener"/> 
            <property name="sessionTransacted" value="true"/> 
       </bean>
     执行测试方法的结果是:
    sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
    on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]
    receive  MapMessage->{mailId=testId, from=cathy, to=david, content=Hello}
     那么在MessageListenerAdapter中又是如何处理消息的呢?
    public void onMessage(Message message, Session session) throws JMSException {
    		// 获取代理方法
    		Object delegate = getDelegate();
    		// 如果代理方法不是当前类
    		if (delegate != this) {
    			// 如果代理方法是实现SessionAwareMessageListener接口的,直接将消息和session交给代理方法处理
    			if (delegate instanceof SessionAwareMessageListener) {
    				if (session != null) {
    					((SessionAwareMessageListener) delegate).onMessage(message, session);
    					return;
    				}
    				// session为null,且该代理对象未实现MessageListener接口则抛出一个异常
    				else if (!(delegate instanceof MessageListener)) {
    					throw new javax.jms.IllegalStateException("MessageListenerAdapter cannot handle a " +
    							"SessionAwareMessageListener delegate if it hasn't been invoked with a Session itself");
    				}
    			}
    			// 如果代理对象实现了MessageListener接口则将消息交给代理对象处理
    			if (delegate instanceof MessageListener) {
    				((MessageListener) delegate).onMessage(message);
    				return;
    			}
    		}
    
    		//将消息体转换为一个对象
    		Object convertedMessage = extractMessage(message);
    		// 获取这个消息的处理方法
    		String methodName = getListenerMethodName(message, convertedMessage);
    		if (methodName == null) {
    			throw new javax.jms.IllegalStateException("No default listener method specified: " +
    					"Either specify a non-null value for the 'defaultListenerMethod' property or " +
    					"override the 'getListenerMethodName' method.");
    		}
    
    		// 反射调用处理方法处理消息
    		Object[] listenerArguments = buildListenerArguments(convertedMessage);
    		Object result = invokeListenerMethod(methodName, listenerArguments);
    		if (result != null) {
    			handleResult(result, message, session);
    		}
    		else {
    			logger.trace("No result object given - no result to handle");
    		}
    	}
     
  • 相关阅读:
    HeadFirst Ruby 第七章总结 references
    《HTTP 权威指南》笔记:第三章 HTTP 报文
    HTTP 权威指南 第二章 URL 与资源
    HeadFIrst Ruby 第七章总结 hashes
    HeadFIrst Ruby 第六章总结 block return values
    面向对象的面试题
    属性,类方法,静态方法,Python2和3方法
    类的抽象类接口类,多态封装
    类的继承
    面向对象空间和组合
  • 原文地址:https://www.cnblogs.com/wei-zw/p/8797792.html
Copyright © 2011-2022 走看看