zoukankan      html  css  js  c++  java
  • ActiveMQ_点对点队列(二)

     
    一、本文章包含的内容
    1、列举了ActiveMQ中通过Queue方式发送、消费队列的代码(普通文本、json/xml字符串、对象数据)
    2、spring+activemq方式
     
    二、配置信息
    1、activemq的pom.xml信息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <!--activemq  Begin-->
           <dependency>
               <groupId>org.springframework</groupId>
               <artifactId>spring-jms</artifactId>
               <version>${spring.version}</version>
           </dependency>
           <!-- <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-messaging</artifactId>
                <version>${spring.version}</version>
            </dependency>-->
           <dependency>
               <groupId>org.apache.activemq</groupId>
               <artifactId>activemq-all</artifactId>
               <version>5.14.0</version>
           </dependency>
           <!--activemq  End-->

    2、activemq的配置文件:spring-jms.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    <!-- 启用spring mvc 注解 -->
       <context:component-scan base-package="org.soa.test.activemq"/>
     
       <!-- 配置JMS连接工厂 -->
       <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
           <property name="brokerURL" value="failover:(tcp://192.168.146.129:61616)" />
           <!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->
           <property name="trustAllPackages" value="true"/>
           <!-- 是否异步发送 -->
           <property name="useAsyncSend" value="true" />
       </bean>
     
       <!--   Queue模式 Begin -->
     
       <!-- 定义消息队列(Queue) -->
       <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
           <!-- 设置消息队列的名字 -->
           <constructor-arg>
               <value>defaultQueueName</value>
           </constructor-arg>
       </bean>
     
       <!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) -->
       <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
           <property name="connectionFactory" ref="connectionFactory" />
           <property name="defaultDestination" ref="queueDestination" />
           <property name="pubSubDomain" value="false"/>
           <!--接收超时时间-->
           <!--<property name="receiveTimeout" value="10000" />-->
       </bean>
       <!--   Queue模式 End -->

    三、队列发送端及测试程序

    1、发送代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    package org.soa.test.activemq.queues;
     
    import org.soa.test.activemq.StudentInfo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
     
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import java.util.List;
     
    /**
     * Created by JamesC on 16-9-22.
     */
    @Component
    public class ProduceMsg {
     
        @Autowired
        private JmsTemplate jmsTemplate;
     
        /**
         * 向指定队列发送消息
         */
        public void sendMessage(Destination destination, final String msg) {
            System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
        }
     
        /**
         * 向默认队列发送消息(默认队列名称在bean:queueDestination配置)
         */
        public void sendMessage(final String msg) {
            //queue://queue1
            String destination = jmsTemplate.getDefaultDestination().toString();
            System.out.println("向队列" + destination + "发送了消息------------" + msg);
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
        }
     
        /**
         * 向默认队列发送消息
         */
        public void sendMessageConvertAndSend(final Object msg) {
     
            String destination = jmsTemplate.getDefaultDestination().toString();
            System.out.println("向队列" + destination + "发送了消息------------" + msg);
            //使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组
            jmsTemplate.convertAndSend(destination, msg);
        }
     
        /**
         * 向指定队列发送消息
         */
        public void sendStudentInfo(Destination destination, final StudentInfo msg) {
            System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(msg);
                }
            });
        }
    }

    2、测试程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    package org.soa.test.activemq.queues;
     
    import com.alibaba.fastjson.JSON;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.soa.test.activemq.StudentInfo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
     
    import javax.jms.Destination;
    import java.util.Date;
     
     
    /**
     * Created by JamesC on 16-9-22.
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("/spring-jms.xml")
    public class ProduceMsgTest extends AbstractJUnit4SpringContextTests {
     
        @Autowired
        protected ApplicationContext ctx;
     
        /**
         * 队列名queue1  这里使用jms配置文件中的数据
         */
        @Autowired
        private Destination queueDestination;
     
        /**
         * 队列消息生产者
         */
        @Autowired
        private ProduceMsg produceMessage;
     
     
        //向默认队列发消息(文本)
        @Test
        public void produceMsg_DefaultQueue() {
            String msg = "这里是向默认队列发送的消息" + new Date().toString();
            produceMessage.sendMessage(msg);
        }
     
        //向默认队列发消息(Json字符串)
        @Test
        public void produceMsg_Json() {
            StudentInfo info = new StudentInfo();
            info.setId(1);
            info.setStdName("李磊");
            info.setStdNo("001");
            info.setEnterDate(new Date());  //队列存放的是时间戳
     
            String alibabaJson = JSON.toJSONString(info);
            produceMessage.sendMessage(alibabaJson);
        }
     
        //向默认队列发消息(使用convertAndSend发送对象)
        @Test
        public void produceMsg_ConvertAndSend() {
            StudentInfo info = new StudentInfo();
            info.setId(1);
            info.setStdName("李磊");
            info.setStdNo("001");
            info.setEnterDate(new Date());
     
            produceMessage.sendMessageConvertAndSend(info);
        }
     
        //向指定队列发消息(文本)
        @Test
        public void produceMsg_CustomQueue() {
            for (int i = 0; i < 20; i++) {
                ActiveMQQueue myDestination = new ActiveMQQueue("queueCustom");
                produceMessage.sendMessage(myDestination, "----发送消息给queueCustom");
            }
        }
     
        //向指定队列发消息(队列名称从XML读取)
        @Test
        public void produceMsg_XmlQueue() {
            for (int i = 0; i < 20; i++) {
                ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean("queueDestination");
                produceMessage.sendMessage(destinationQueue, "----send my msg to queueXml");
            }
        }
     
        //向指定队列发消息(发送对象)
        @Test
        public void produceMsg_StudentInfo() {
     
            StudentInfo info = new StudentInfo();
            info.setId(1);
            info.setStdName("李磊");
            info.setStdNo("001");
            info.setEnterDate(new Date());
     
            ActiveMQQueue destination = new ActiveMQQueue("StudentInfo");
            produceMessage.sendStudentInfo(destination, info);
        }
    }

    四、队列消费端及测试程序

    1、消费代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package org.soa.test.activemq.queues;
     
    import org.soa.test.activemq.StudentInfo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.support.JmsUtils;
    import org.springframework.stereotype.Component;
     
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.ObjectMessage;
    import javax.jms.TextMessage;
     
    /**
     * Created by JamesC on 16-9-22.
     */
    @Component
    public class ConsumeMsg {
        @Autowired
        private JmsTemplate jmsTemplate;
     
        /**
         * 接受消息
         */
        public String receive(Destination destination) {
            TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
            String msg = "";
            try {
                msg = tm.getText();
                System.out.println("从队列" + destination.toString() + "收到了消息: " + msg);
     
            } catch (JMSException e) {
                e.printStackTrace();
                return "";
            }
            return msg;
        }
     
        /**
         * 接受消息
         */
        public StudentInfo receiveStudentInfo() {
            try {
                String destination = jmsTemplate.getDefaultDestination().toString();
                ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);
                return (StudentInfo)msg.getObject();
     
            } catch (JMSException e) {
                //检查性异常转换为非检查性异常
                throw JmsUtils.convertJmsAccessException(e);
            }
        }
     
        /**
         * 接受消息
         */
        public Object receiveConvertAndReceive() {
            String destination = jmsTemplate.getDefaultDestination().toString();
            Object msg = jmsTemplate.receiveAndConvert(destination);
            return msg;
        }
    }

    2、测试程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package org.soa.test.activemq.queues;
     
    import org.apache.activemq.command.ActiveMQQueue;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.soa.test.activemq.StudentInfo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
     
    /**
     * Created by JamesC on 16-9-22.
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("/spring-jms.xml")
    public class ConsumeMsgTest {
     
        @Autowired
        private ConsumeMsg consumeMsg;
     
        //从指定队列接收消息(文本)
        @Test
        public void receiveMsg() {
            //没有消息阻塞一段时间后会抛异常
            //java.lang.NullPointerException
            ActiveMQQueue destination = new ActiveMQQueue("defaultQueueName");
            consumeMsg.receive(destination);
        }
     
        //从指定队列接收消息(StudentInfo对象消息)
        @Test
        public void receiveStudentInfo() {
            StudentInfo msg = consumeMsg.receiveStudentInfo();
            System.out.println(msg.getStdName());
        }
     
        //从指定队列接收消息(Json对象)
        @Test
        public void receiveConvertAndReceive() {
     
            StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();
            System.out.println(msg.getStdName());
        }
    }





  • 相关阅读:
    PhysX3 User Guide 04 Rigid Body Dynamics
    PhysX3 User Guide 05 Scene Queries
    PhysX3 User Guide 01 基础
    error C2061: syntax error : identifier '__RPC__out_xcount_part'
    vim中替换字符串
    Linker Tools Warning LNK4098
    PhysX3 User Guide 03 Joint
    【转】C RunTime Library 暨 深入理解编译选项的含义 04
    不足
    tortoiseSVN Check out正常但Show Log时 Go Offline
  • 原文地址:https://www.cnblogs.com/gossip/p/5970090.html
Copyright © 2011-2022 走看看