zoukankan      html  css  js  c++  java
  • ActiveMQ之spring集成消息转换器MessageConverter

    
    

    MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象转换成对应的目标对象,这主要是用在接收消息的时候。

    
    

    下面我们就拿发送一个对象消息来举例, 传输USer对象:


    package com.xinwei.order.entity; import java.io.Serializable; import java.util.Date;
    public class User implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private int age; private Date birthDate; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } }

    定义转换器

    import java.io.Serializable;  
      
    import javax.jms.JMSException;  
    import javax.jms.Message;  
    import javax.jms.ObjectMessage;  
    import javax.jms.Session;  
      
    import org.springframework.jms.support.converter.MessageConversionException;  
    import org.springframework.jms.support.converter.MessageConverter;  
      
    /** 
     * spring 消息转换器 
     * @author slimina 
     * 
     */  
    public class UserMessageConverter implements MessageConverter {  
      
        public Message toMessage(Object object, Session session)    
                throws JMSException, MessageConversionException {    
            return session.createObjectMessage((Serializable) object);    
        }    
         
        public Object fromMessage(Message message) throws JMSException,    
                MessageConversionException {    
            ObjectMessage objMessage = (ObjectMessage) message;    
            return objMessage.getObject();    
        }  
    }  

    消息接收

    import com.google.gson.Gson;  
      
    public class UserMessageListener {  
      
        public void receiveMessage(User user) {  
            System.out.println(new Gson().toJson(user));  
        }  
          
         public void receiveMessage(String message) {    
                System.out.println("接收到一个纯文本消息,消息内容是:" + message);    
         }    
    }  
    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"  
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"  
        xmlns:lang="http://www.springframework.org/schema/lang"  
        xsi:schemaLocation="  
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
                http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
                http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd  
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
                http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd  
                http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">  
      
         <!-- 配置JMS连接工厂 -->    
        <bean id="connectionFactory"    
            class="org.springframework.jms.connection.CachingConnectionFactory">    
            <!-- Session缓存数量 -->    
            <property name="sessionCacheSize" value="10" />    
            <property name="targetConnectionFactory">    
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
                    <!-- MQ地址 -->    
                    <property name="brokerURL" value="tcp://192.168.233.128:61616" />    
                      <property name="trustAllPackages" value="true"/>
                     <!-- 是否异步发送 -->    
                    <property name="useAsyncSend" value="false" />    
                </bean>    
            </property>    
        </bean>   
      
        <!-- 定义队列 -->  
        <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">  
            <property name="physicalName" value="spring.queue.converte" />  
        </bean>  
          
        <!-- jmsTemplate,用于向任意地址发送消息 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <property name="connectionFactory" ref="connectionFactory" />  
            <property name="defaultDestination" ref="testQueue" />  
            <property name="sessionTransacted" value="false" />  
            <!-- receiveTimeout表示接收消息时的超时时间 -->  
            <property name="receiveTimeout" value="30000" />  
            <!-- 消息转换器 -->    
            <property name="messageConverter" ref="userMessageConverter"/>    
        </bean>  
           <!-- 类型转换器 -->    
        <bean id="userMessageConverter" class="com.xinwei.order.controller.UserMessageConverter"/>    
    </beans>  
    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"  
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"  
        xmlns:lang="http://www.springframework.org/schema/lang"  
        xsi:schemaLocation="  
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
                http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
                http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd  
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
                http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd  
                http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">  
      
        <!-- 配置JMS连接工厂 -->  
        <bean id="connectionFactory"  
            class="org.springframework.jms.connection.CachingConnectionFactory">  
            <!-- Session缓存数量 -->  
            <property name="sessionCacheSize" value="10" />  
            <!-- 接收者ID 持久化订阅 -->  
            <property name="targetConnectionFactory">  
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                    <!-- MQ地址 -->  
                    <property name="brokerURL" value="tcp://192.168.233.128:61616" />  
                     <property name="trustAllPackages" value="true"/>
                </bean>  
            </property>  
        </bean>  
        
    <!--     <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean> -->
      
        <!-- 定义队列 -->  
        <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">  
            <property name="physicalName" value="spring.queue.converte" />  
        </bean>  
      
        <!-- 异步接收Queue消息Container -->  
        <bean id="queueContainer"  
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactory" />  
            <property name="destination" ref="testQueue" />  
            <!-- 使用MessageListenerAdapter来作为消息监听器 -->  
            <property name="messageListener" ref="messageListenerAdapter" />  
            <property name="receiveTimeout" value="30000" />  
        </bean>  
      
        <!-- 消息监听适配器 -->  
        <bean id="messageListenerAdapter"  
            class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
            <property name="delegate" ref="userMessageListener" />  
            <property name="defaultListenerMethod" value="receiveMessage" />  
            <property name="messageConverter" ref="userMessageConverter" />  
        </bean>  
      
        <bean id="userMessageListener"  
            class="com.xinwei.order.controller.UserMessageListener"></bean>  
        <!-- 类型转换器 -->  
        <bean id="userMessageConverter"  
            class="com.xinwei.order.controller.UserMessageConverter" />  
    </beans>  
    package com.xinwei.util.quartz;
    
    import java.util.Date;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    
    import com.xinwei.order.entity.User;
      
    @SuppressWarnings("resource")  
    public class ProducerMain {  
      
        public static void main(String[] args) {  
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:msgconverter/producer.xml");  
            JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);  
            User user = new User();  
            user.setId("1001");  
            user.setName("jack");  
            user.setAge(22);  
            user.setBirthDate(new Date());  
            jmsTemplate.convertAndSend(user);  
        }  
    }  
    
    
    ----
    package com.xinwei.util.quartz;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;  
    
    public class ConsumerMain {  
        @SuppressWarnings("resource")  
        public static void main(String[] args) {  
              new ClassPathXmlApplicationContext("classpath:msgconverter/consumer.xml");  
        }  
    }  
    
    
    ----
    
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/E:/maven_repository/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/E:/maven_repository/org/apache/activemq/activemq-all/5.13.2/activemq-all-5.13.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    2017-12-04 09:51:54,246 [main] INFO  [org.springframework.context.support.ClassPathXmlApplicationContext] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@11cf437c: startup date [Mon Dec 04 09:51:54 CST 2017]; root of context hierarchy
    2017-12-04 09:51:54,285 [main] INFO  [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - Loading XML bean definitions from class path resource [msgconverter/consumer.xml]
    2017-12-04 09:51:54,790 [main] INFO  [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase 2147483647
    2017-12-04 09:51:54,973 [main] INFO  [org.springframework.jms.connection.CachingConnectionFactory] - Established shared JMS Connection: ActiveMQConnection {id=ID:fangping-54864-1512352314851-1:1,clientId=null,started=false}
    {"id":"1001","name":"jack","age":22,"birthDate":"Dec 4, 2017 9:51:44 AM"}
     org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
    解决方法: http://activemq.apache.org/objectmessage.html




  • 相关阅读:
    C++---拷贝构造函数和赋值构造函数
    C++---类成员变量定义为引用
    从文件处理到文件的高级应用
    Jupyter的使用复习
    字符编码到python编辑器流程
    周四的小结
    中秋前的题目
    三段代码块带走今天的脚本
    今日份的随笔
    明天才能学的运算符号表格
  • 原文地址:https://www.cnblogs.com/alamps/p/7975847.html
Copyright © 2011-2022 走看看