zoukankan      html  css  js  c++  java
  • spring+activemq实战之配置监听多队列实现不同队列消息消费

    摘选:https://my.oschina.net/u/3613230/blog/1457227

    摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-point模式中,配置多个队列,消息生成者只能往一个队列中发消息或者往多个队列发送相同消息,并且监听器只能监听一个队列,这样配置多个队列也没有意义,作者想要实现的是:配置多个队列,并且生产者可以往多个队列中发送不同的消息,监听器消费时,可以判断根据不同的队列进行相应的业务处理,网上搜了一个,发现都是单个队列和监听,研究了一下,发现是可以实现的,废话不多说,直接上代码:

    项目结构截图

    maven所需依赖:

    复制代码
     1     <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     2       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  
     3       <modelVersion>4.0.0</modelVersion>  
     4       <groupId>com.gxf</groupId>  
     5       <artifactId>springmq</artifactId>  
     6       <packaging>war</packaging>  
     7       <version>0.0.1-SNAPSHOT</version>  
     8       <name>springmq Maven Webapp</name>  
     9       <url>http://maven.apache.org</url>  
    10     <!-- 版本管理 -->  
    11       <properties>  
    12           <springframework>4.1.8.RELEASE</springframework>  
    13           <javax.servlet>3.1.0</javax.servlet>  
    14       </properties>  
    15       
    16       <dependencies>  
    17       
    18         <dependency>  
    19           <groupId>junit</groupId>  
    20           <artifactId>junit</artifactId>  
    21           <version>4.10</version>  
    22           <scope>test</scope>  
    23         </dependency>  
    24       
    25         <dependency>  
    26             <groupId>jstl</groupId>  
    27             <artifactId>jstl</artifactId>  
    28             <version>1.2</version>  
    29         </dependency>  
    30       
    31         <dependency>  
    32                 <groupId>javax.servlet</groupId>  
    33                 <artifactId>javax.servlet-api</artifactId>  
    34                 <version>${javax.servlet}</version>  
    35         </dependency>  
    36       
    37         <!-- spring -->  
    38         <dependency>  
    39             <groupId>org.springframework</groupId>  
    40             <artifactId>spring-core</artifactId>  
    41             <version>${springframework}</version>  
    42         </dependency>  
    43         <dependency>  
    44             <groupId>org.springframework</groupId>  
    45             <artifactId>spring-context</artifactId>  
    46             <version>${springframework}</version>  
    47         </dependency>  
    48         <dependency>  
    49             <groupId>org.springframework</groupId>  
    50             <artifactId>spring-tx</artifactId>  
    51             <version>${springframework}</version>  
    52         </dependency>  
    53         <dependency>  
    54             <groupId>org.springframework</groupId>  
    55             <artifactId>spring-webmvc</artifactId>  
    56             <version>${springframework}</version>  
    57         </dependency>  
    58         <dependency>    
    59             <groupId>org.springframework</groupId>    
    60             <artifactId>spring-jms</artifactId>    
    61             <version>${springframework}</version>    
    62         </dependency>  
    63         <!-- xbean 如<amq:connectionFactory /> -->  
    64         <dependency>  
    65             <groupId>org.apache.xbean</groupId>  
    66             <artifactId>xbean-spring</artifactId>  
    67             <version>3.16</version>  
    68         </dependency>  
    69       
    70         <!-- activemq -->  
    71         <dependency>    
    72             <groupId>org.apache.activemq</groupId>    
    73             <artifactId>activemq-core</artifactId>    
    74             <version>5.7.0</version>  
    75         </dependency>   
    76         <dependency>    
    77             <groupId>org.apache.activemq</groupId>    
    78             <artifactId>activemq-pool</artifactId>    
    79             <version>5.14.3</version>    
    80         </dependency>    
    81       
    82       </dependencies>  
    83       
    84       <build>  
    85         <finalName>springmq</finalName>  
    86       </build>  
    87     </project>  
    复制代码

    -activemq配置文件:activemq.xml

    复制代码
     1     <?xml version="1.0" encoding="UTF-8"?>  
     2     <beans xmlns="http://www.springframework.org/schema/beans"  
     3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     4         xmlns:amq="http://activemq.apache.org/schema/core"  
     5         xmlns:jms="http://www.springframework.org/schema/jms"  
     6         xmlns:context="http://www.springframework.org/schema/context"  
     7         xmlns:mvc="http://www.springframework.org/schema/mvc"  
     8         xsi:schemaLocation="  
     9             http://www.springframework.org/schema/beans       
    10             http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
    11             http://www.springframework.org/schema/context  
    12             http://www.springframework.org/schema/context/spring-context-4.1.xsd  
    13             http://www.springframework.org/schema/mvc  
    14             http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd  
    15             http://www.springframework.org/schema/jms  
    16             http://www.springframework.org/schema/jms/spring-jms-4.1.xsd  
    17             http://activemq.apache.org/schema/core  
    18             http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd"  
    19             >  
    20       
    21         <context:component-scan base-package="com.gxf" />  
    22         <mvc:annotation-driven />  
    23       
    24         <amq:connectionFactory id="amqConnectionFactory"  brokerURL="tcp://192.168.0.112:61616" userName="admin" password="admin"></amq:connectionFactory>  
    25       
    26         <!-- 配置JMS连接工长 -->  
    27         <bean id="connectionFactory"  
    28             class="org.springframework.jms.connection.CachingConnectionFactory">  
    29             <constructor-arg ref="amqConnectionFactory" />  
    30             <property name="sessionCacheSize" value="100" />  
    31         </bean>  
    32       
    33         <!-- 定义消息队列(Queue) -->  
    34         <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
    35             <!-- 配置两个消息队列:queue1,queue2 -->  
    36              <constructor-arg index="0" value="queue1,queue2" />  
    37         </bean>  
    38       
    39         <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->  
    40         <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    41             <property name="connectionFactory" ref="connectionFactory" />  
    42             <property name="defaultDestination" ref="queueDestination" />  
    43             <property name="receiveTimeout" value="10000" />  
    44             <!-- true是topic,false是queue,默认是false,此处显示写出false -->  
    45             <property name="pubSubDomain" value="false" />  
    46         </bean>  
    47       
    48         <!-- 配置消息队列监听者(Queue) -->  
    49         <bean id="queueMessageListener" class="com.gxf.listener.QueueMessageListener" />  
    50         <bean id="queueListenerContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    51             <property name="connectionFactory" ref="connectionFactory" />  
    52             <property name="destination" ref="queueDestination" />  
    53             <property name="messageListener" ref="queueMessageListener" />  
    54         </bean>  
    55       
    56      </beans>  
    复制代码

    -springmvc配置文件:springmvc.xml

    复制代码
     1     <?xml version="1.0" encoding="UTF-8"?>  
     2     <beans xmlns="http://www.springframework.org/schema/beans"  
     3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     4         xmlns:context="http://www.springframework.org/schema/context"  
     5         xmlns:mvc="http://www.springframework.org/schema/mvc"  
     6         xsi:schemaLocation="http://www.springframework.org/schema/beans   
     7             http://www.springframework.org/schema/beans/spring-beans.xsd  
     8             http://www.springframework.org/schema/context  
     9             http://www.springframework.org/schema/context/spring-context-4.1.xsd  
    10             http://www.springframework.org/schema/mvc   
    11             http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">  
    12       
    13         <context:component-scan base-package="com.gxf" />  
    14         <mvc:annotation-driven />  
    15       
    16         <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">  
    17             <property name="viewClass"  
    18                 value="org.springframework.web.servlet.view.JstlView" />  
    19             <property name="prefix" value="/WEB-INF/views/" />  
    20             <property name="suffix" value=".jsp" />  
    21         </bean>  
    22     </beans>  
    复制代码

    -Controll层 MainHandler.java代码:

    复制代码
      1     package com.gxf.handler;  
      2       
      3     import java.text.SimpleDateFormat;  
      4     import java.util.*;  
      5       
      6     import javax.annotation.Resource;  
      7     import javax.jms.Destination;  
      8       
      9       
     10     import org.apache.activemq.command.ActiveMQDestination;  
     11     import org.springframework.stereotype.Controller;  
     12     import org.springframework.web.bind.annotation.RequestMapping;  
     13     import org.springframework.web.bind.annotation.RequestMethod;  
     14     import org.springframework.web.bind.annotation.RequestParam;  
     15     import org.springframework.web.servlet.ModelAndView;  
     16       
     17     import com.gxf.service.ProducerService;  
     18       
     19       
     20     /** 
     21      *  
     22      * @author stark2017 
     23      * 
     24      */  
     25     @Controller  
     26     public class MainHandler {  
     27       
     28       
     29         //队列名  
     30         @Resource(name="queueDestination")  
     31         private Destination queueDestination;  
     32       
     33       
     34         //队列消息生产者  
     35         @Resource(name="producerService")  
     36         private ProducerService producerService;  
     37       
     38       
     39       
     40         @RequestMapping(value="/main",method=RequestMethod.GET)  
     41         public String producer(){  
     42       
     43             return "main";  
     44         }  
     45         /** 
     46          * 往队列queue1中发送消息 
     47          * @param message 
     48          * @return 
     49          */  
     50         @RequestMapping(value="/sendone",method=RequestMethod.POST)  
     51         public String producer(@RequestParam("message") String message) {  
     52       
     53             /** 
     54              * 将destination强制转换为ActiveMQDestination,在ActiveMQDestination对象中, 
     55              *    通过getCompositeDestinations()方法获取destination队列数组:queue://queue1  queue://queue2 
     56              *   
     57              */  
     58             ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;  
     59             /** 
     60              * 往队列queue1中发送文本消息 
     61              */  
     62             System.out.println("往队列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中发送文本消息");  
     63             producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[0], message);  
     64             /** 
     65              * 往队列queue1中发送MapMessage消息 
     66              */  
     67             System.out.println("往队列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中发送MapMessage消息");  
     68             producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[0], message);  
     69       
     70             //String bb="fdsalfkasdfkljasd;flkajsfd";  
     71             //byte[] b =  bb.getBytes();  
     72       
     73            // producer.sendBytesMessage(demoQueueDestination, b);  
     74       
     75             //producer.sendMapMessage(mqQueueDestination, message);  
     76       
     77             return "main";  
     78         }  
     79         /** 
     80          * 往消息队列queue2中发送消息 
     81          * @param message 
     82          * @return 
     83          */  
     84         @RequestMapping(value="/sendtwo",method=RequestMethod.POST)  
     85         public String producertwo(@RequestParam("message") String message) {  
     86       
     87       
     88             /** 
     89              * 将destination强制转换为ActiveMQDestination,在ActiveMQDestination对象中, 
     90              *    通过getCompositeDestinations()方法获取destination队列数组:queue://queue1  queue://queue2 
     91              *   
     92              */  
     93             ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;  
     94             /** 
     95              * 队列queue2中发送文本消息 
     96              */  
     97             System.out.println("往队列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中发送文本消息");  
     98             producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[1], message);  
     99             /** 
    100              * 队列queue2中发送mapMessage消息 
    101              */  
    102             System.out.println("往队列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中发送文本消息");  
    103             producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[1], message);  
    104       
    105             String bb="fdsalfkasdfkljasd;flkajsfd";  
    106             byte[] b =  bb.getBytes();  
    107       
    108            // producer.sendBytesMessage(demoQueueDestination, b);  
    109       
    110             //producer.sendMapMessage(mqQueueDestination, message);  
    111       
    112             return "main";  
    113         }  
    114       
    115       
    116       
    117       
    118     }  
    复制代码

    -生产者ProducerService.java代码:

    复制代码
      1     package com.gxf.service;  
      2       
      3     import java.io.Serializable;  
      4     import java.util.List;  
      5     import java.util.Map;  
      6       
      7     import javax.annotation.Resource;  
      8     import javax.jms.BytesMessage;  
      9     import javax.jms.Destination;  
     10     import javax.jms.JMSException;  
     11     import javax.jms.MapMessage;  
     12     import javax.jms.Message;  
     13     import javax.jms.Session;  
     14     import javax.jms.StreamMessage;  
     15       
     16     import org.springframework.jms.core.JmsTemplate;  
     17     import org.springframework.jms.core.MessageCreator;  
     18     import org.springframework.stereotype.Service;  
     19       
     20     @Service  
     21     public class ProducerService {  
     22       
     23         @Resource(name = "jmsTemplate")  
     24         private JmsTemplate jmsTemplate;  
     25       
     26         /** 
     27          * 向指定Destination发送text消息 
     28          *  
     29          * @param destination 
     30          * @param message 
     31          */  
     32         public void sendTxtMessage(Destination destination, final String message) {  
     33             if (null == destination) {  
     34                 destination = jmsTemplate.getDefaultDestination();  
     35             }  
     36             jmsTemplate.send(destination, new MessageCreator() {  
     37                 public Message createMessage(Session session) throws JMSException {  
     38                     return session.createTextMessage(message);  
     39                 }  
     40             });  
     41             System.out.println("springJMS send text message...");  
     42         }  
     43       
     44         /** 
     45          * 向指定Destination发送map消息 
     46          *  
     47          * @param destination 
     48          * @param message 
     49          */  
     50         public void sendMapMessage(Destination destination, final String message) {  
     51             if (null == destination) {  
     52                 destination = jmsTemplate.getDefaultDestination();  
     53             }  
     54             jmsTemplate.send(destination, new MessageCreator() {  
     55                 public Message createMessage(Session session) throws JMSException {  
     56                     MapMessage mapMessage = session.createMapMessage();  
     57                     mapMessage.setString("msgId", message);  
     58                     return mapMessage;  
     59                 }  
     60             });  
     61             System.out.println("springJMS send map message...");  
     62         }  
     63       
     64         /** 
     65          * 向指定Destination发送序列化的对象 
     66          *  
     67          * @param destination 
     68          * @param object 
     69          *            object 必须序列化 
     70          */  
     71         public void sendObjectMessage(Destination destination, final Serializable object) {  
     72             if (null == destination) {  
     73                 destination = jmsTemplate.getDefaultDestination();  
     74             }  
     75             jmsTemplate.send(destination, new MessageCreator() {  
     76                 public Message createMessage(Session session) throws JMSException {  
     77                     return session.createObjectMessage(object);  
     78                 }  
     79             });  
     80             System.out.println("springJMS send object message...");  
     81         }  
     82       
     83         /** 
     84          * 向指定Destination发送字节消息 
     85          *  
     86          * @param destination 
     87          * @param bytes 
     88          */  
     89         public void sendBytesMessage(Destination destination, final byte[] bytes) {  
     90             if (null == destination) {  
     91                 destination = jmsTemplate.getDefaultDestination();  
     92             }  
     93             jmsTemplate.send(destination, new MessageCreator() {  
     94                 public Message createMessage(Session session) throws JMSException {  
     95                     BytesMessage bytesMessage = session.createBytesMessage();  
     96                     bytesMessage.writeBytes(bytes);  
     97                     return bytesMessage;  
     98       
     99                 }  
    100             });  
    101             System.out.println("springJMS send bytes message...");  
    102         }  
    103       
    104         /** 
    105          * 向默认队列发送Stream消息 
    106          */  
    107         public void sendStreamMessage(Destination destination) {  
    108             jmsTemplate.send(new MessageCreator() {  
    109                 public Message createMessage(Session session) throws JMSException {  
    110                     StreamMessage message = session.createStreamMessage();  
    111                     message.writeString("stream string");  
    112                     message.writeInt(11111);  
    113                     return message;  
    114                 }  
    115             });  
    116             System.out.println("springJMS send Strem message...");  
    117         }  
    118       
    119     }  
    复制代码

    -队列消息监听器QueueMessageListener.java代码:

    复制代码
      1     package com.gxf.listener;  
      2     import javax.jms.BytesMessage;  
      3     import javax.jms.JMSException;  
      4     import javax.jms.MapMessage;  
      5     import javax.jms.Message;  
      6     import javax.jms.MessageListener;  
      7     import javax.jms.ObjectMessage;  
      8     import javax.jms.StreamMessage;  
      9     import javax.jms.TextMessage;  
     10       
     11     import org.apache.activemq.advisory.DestinationEvent;  
     12     import org.apache.activemq.command.ActiveMQDestination;  
     13     import org.apache.activemq.command.ActiveMQMessage;  
     14     import org.apache.activemq.command.DestinationInfo;  
     15       
     16       
     17     public class QueueMessageListener implements MessageListener {  
     18       
     19       
     20         //当收到消息后,自动调用该方法  
     21         @Override  
     22         public void onMessage(Message message) {  
     23              try {  
     24                  ActiveMQDestination queues=(ActiveMQDestination)message.getJMSDestination();  
     25       
     26                  /** 
     27                   * 监听消息队列queue1中的消息 
     28                   */  
     29                 if(queues.getPhysicalName().equalsIgnoreCase("queue1"))  
     30                 {  
     31                      System.out.println("监听队列:"+queues.getPhysicalName()+"消费了消息:");  
     32                     // 如果是文本消息  
     33                         if (message instanceof TextMessage) {  
     34                             TextMessage tm = (TextMessage) message;  
     35                             try {  
     36                                 System.out.println("from get textMessage:	" + tm.getText());  
     37                             } catch (JMSException e) {  
     38                                 // TODO Auto-generated catch block  
     39                                 e.printStackTrace();  
     40                             }  
     41                         }  
     42       
     43                         // 如果是Map消息  
     44                         if (message instanceof MapMessage) {  
     45                             MapMessage mm = (MapMessage) message;  
     46                             try {  
     47                                 System.out.println("from get MapMessage:	" + mm.getString("msgId"));  
     48                             } catch (JMSException e) {  
     49                                 // TODO Auto-generated catch block  
     50                                 e.printStackTrace();  
     51                             }  
     52                         }  
     53                 }  
     54                 /** 
     55                  * 监听消息队列queue2中的消息 
     56                  */  
     57                if(queues.getPhysicalName().equalsIgnoreCase("queue2"))  
     58                {  
     59                     System.out.println("监听队列:"+queues.getPhysicalName()+"消费了消息:");  
     60                    // 如果是文本消息  
     61                        if (message instanceof TextMessage) {  
     62                            TextMessage tm = (TextMessage) message;  
     63                            try {  
     64                                System.out.println("from get textMessage:	" + tm.getText());  
     65                            } catch (JMSException e) {  
     66                                // TODO Auto-generated catch block  
     67                                e.printStackTrace();  
     68                            }  
     69                        }  
     70       
     71                        // 如果是Map消息  
     72                        if (message instanceof MapMessage) {  
     73                            MapMessage mm = (MapMessage) message;  
     74                            try {  
     75                                System.out.println("from get MapMessage:	" + mm.getString("msgId"));  
     76                            } catch (JMSException e) {  
     77                                // TODO Auto-generated catch block  
     78                                e.printStackTrace();  
     79                            }  
     80                        }  
     81                }  
     82       
     83             } catch (JMSException e1) {  
     84                 // TODO Auto-generated catch block  
     85                 e1.printStackTrace();  
     86             }  
     87       
     88       
     89             // 如果是Object消息  
     90             if (message instanceof ObjectMessage) {  
     91                 ObjectMessage om = (ObjectMessage) message;  
     92                 System.out.println("from get ObjectMessage:	");  
     93             }  
     94       
     95             // 如果是bytes消息  
     96             if (message instanceof BytesMessage) {  
     97                  System.out.println("from get BytesMessage:	");  
     98                 byte[] b = new byte[1024];  
     99                 int len = -1;  
    100                 BytesMessage bm = (BytesMessage) message;  
    101                 try {  
    102                     while ((len = bm.readBytes(b)) != -1) {  
    103                         System.out.println(new String(b, 0, len));  
    104                     }  
    105                 } catch (JMSException e) {  
    106                     // TODO Auto-generated catch block  
    107                     e.printStackTrace();  
    108                 }  
    109             }  
    110       
    111             // 如果是Stream消息  
    112             if (message instanceof StreamMessage) {  
    113                 System.out.println("from get BytesMessage:	");  
    114                 StreamMessage sm = (StreamMessage) message;  
    115                 try {  
    116                     System.out.println(sm.readString());  
    117                     System.out.println(sm.readInt());  
    118                 } catch (JMSException e) {  
    119                     // TODO Auto-generated catch block  
    120                     e.printStackTrace();  
    121                 }  
    122       
    123             }}  
    124       
    125     }  
    复制代码

    -启动项目访问main,进行消息发送:

     后台打印往不同队列发送的消息和监听到不同队列中的消息:

    队列queue1发送消费了14条消息,queue2发送消费了10条消息:

    到此想要的功能需求已实现

    技术交流群,海量学习资料免费获取,备注来意:就说博客上看到的, Q群:289683917
  • 相关阅读:
    Atitti 图像处理 图像混合 图像叠加 blend 原理与实现
    Atitit Gaussian Blur 高斯模糊 的原理and实现and 用途
    Atitit 图像处理 灰度图片 灰度化的原理与实现
    Atitit (Sketch Filter)素描滤镜的实现  图像处理  attilax总结
    Atitit 实现java的linq 以及与stream api的比较
    Atitit attilax在自然语言处理领域的成果
    Atitit 图像处理 常用8大滤镜效果 Jhlabs 图像处理类库 java常用图像处理类库
    Atitit 图像处理--图像分类 模式识别 肤色检测识别原理 与attilax的实践总结
    Atitit apache 和guava的反射工具
    atitit。企业的价值观 员工第一 vs 客户第一.docx
  • 原文地址:https://www.cnblogs.com/zhoading/p/12009116.html
Copyright © 2011-2022 走看看