zoukankan      html  css  js  c++  java
  • spring整合activemq发送MQ消息[queue模式]实例

    queue类型消息

    pom依赖

    <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.11.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.1.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.1.4.RELEASE</version>
            </dependency>

    1、配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 配置JMS连接工厂 -->
        <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
        </bean>
    
        <!-- 定义消息队列(Queue) -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 设置消息队列的名字 -->
            <constructor-arg>
                <value>testSpringQueue</value>
            </constructor-arg>
        </bean>
    
        <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="queueDestination" />
            <property name="receiveTimeout" value="10000" />
        </bean>
        
    </beans>

    2、生产者代码

    package com.mq.spring.queue;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    /**
     * created on 2015/6/4
     * @author dennisit@163.com
     * @version 1.0
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})
    public class QueueSender {
    
        @Resource
        private JmsTemplate jmsTemplate;
    
        @Test
        public void send(){
            sendMqMessage(null,"spring activemq queue type message !");
        }
    
        /**
         * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
         * @param destination
         * @param message
         */
        public void sendMqMessage(Destination destination, final String message){
            if(null == destination){
                destination = jmsTemplate.getDefaultDestination();
            }
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(message);
                }
            });
            System.out.println("spring send message...");
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    
    }

    3、消费者代码

    package com.mq.spring.queue;
    
    import org.junit.Test;
    import javax.jms.*;
    import org.junit.runner.RunWith;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import javax.annotation.Resource;
    import javax.jms.Message;
    
    /**
     * created on 2015/6/4
     * @author dennisit@163.com
     * @version 1.0
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})
    public class QueueReceiver {
    
        @Resource
        private JmsTemplate jmsTemplate;
    
        @Test
        public void receiveMqMessage(){
            Destination destination = jmsTemplate.getDefaultDestination();
            receive(destination);
        }
    
        /**
         * 接受消息
         */
        public void receive(Destination destination) {
            TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
            try {
                System.out.println("从队列" + destination.toString() + "收到了消息:	" + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    }

    程序运行结果:

    说明:上面的生产者和消费者使用同一套配置文件,使用独立的程序去接收消息,spring jms也提供了消息监听处理.接下来我们换成监听式消费

    配置文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 配置JMS连接工厂 -->
        <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
        </bean>
    
        <!-- 定义消息队列(Queue) -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 设置消息队列的名字 -->
            <constructor-arg>
                <value>testSpringQueue</value>
            </constructor-arg>
        </bean>
    
        <!-- 配置消息队列监听者(Queue) -->
        <bean id="consumerMessageListener" class="com.mq.spring.queue.ConsumerMessageListener" />
    
        <!-- 消息监听容器(Queue),配置连接工厂,监听的队列是testSpringQueue,监听器是上面定义的监听器 -->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="consumerMessageListener" />
        </bean>
        
    </beans>

    这样我们的消息消费就可以在监听器中处理消费了.生产的代码不变,修改发送者的消息体内容,执行生产程序.可以看到运行效果如下:



    关于springjms消息监听器,查看:http://haohaoxuexi.iteye.com/blog/1893676


    转载请注明出处:[http://www.cnblogs.com/dennisit/p/4551564.html]

  • 相关阅读:
    python scapy的用法之ARP主机扫描和ARP欺骗
    Python字典取键、值对
    python字典添加元素和删除元素
    Python删除列表元素
    获取本机的IP地址和mac地址
    Python查找电话号码归属地、邮编、运营商信息等
    Python的字符串格式化,%与format
    Python基础笔记一之字符转化、复数、位运算、除法运算、floor和ceil取整,round函数四舍五入
    FAILED: SemanticException Unable to determine if hdfs://tmaster:8020/user/root/words.db/test_t2 is encrypted
    Pyspark读取csv文件
  • 原文地址:https://www.cnblogs.com/dennisit/p/4551564.html
Copyright © 2011-2022 走看看