zoukankan      html  css  js  c++  java
  • 【ActiveMQ入门-11】ActiveMQ学习-compositeDestination

    概要:
    前一章讲解了消费者如何通过通配符来匹配目的地,以实现一个消费者同时接收多个目的地的消息。
    对于生产者来讲,可能存在下面的需求:
    1. 同一条message可能要发送到多个Queue;
    2. 同一条message同时发送到Queue和Topic;等
    这时,我们可以使用composite Destination来解决。

    下面将介绍如何将message发送到多个Queue、以及将message同时发送到Queue和Topic。


    环境:

    1. JmsMessageListener.java
    2. Sender.java
    3. applicationContext-compositeDestination.xml



    方式1:同时向多个Queue中发送相同的消息

    源文件和配置文件:

    JmsMessageListener.java 异步接收消息

    1. package com.ll.compositeDestination;
    2. import javax.jms.JMSException;
    3. import javax.jms.Message;
    4. import javax.jms.MessageListener;
    5. import javax.jms.TextMessage;
    6. public class JmsMessageListener implements MessageListener {
    7. public void onMessage(Message message) {
    8. System.out.println("消息全部内容:" + message.toString());
    9. try {
    10. System.out.println("消息主题:" + message.getJMSDestination().toString());
    11. } catch (JMSException e1) {
    12. e1.printStackTrace();
    13. }
    14. TextMessage tm = (TextMessage) message;
    15. try {
    16. System.out.println("消息体:" + tm.getText());
    17. } catch (JMSException e) {
    18. e.printStackTrace();
    19. }
    20. System.out.println("------------------------------------");
    21. }
    22. }


    Sender.java

    1. package com.ll.compositeDestination;
    2. import javax.jms.Destination;
    3. import javax.jms.JMSException;
    4. import javax.jms.Message;
    5. import javax.jms.Queue;
    6. import javax.jms.Session;
    7. import org.apache.activemq.command.ActiveMQQueue;
    8. import org.springframework.context.ApplicationContext;
    9. import org.springframework.context.support.ClassPathXmlApplicationContext;
    10. import org.springframework.jms.core.JmsTemplate;
    11. import org.springframework.jms.core.MessageCreator;
    12. public class Sender {
    13. public static void main(String[] args) {
    14. ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
    15. "applicationContext-compositeDestination.xml");
    16. JmsTemplate template = (JmsTemplate) applicationContext
    17. .getBean("jmsTemplate");
    18. Destination destination =(Destination) applicationContext
    19. .getBean("destinationProducer");
    20. // Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
    21. //发送消息
    22. template.send(destination, new MessageCreator() {
    23. public Message createMessage(Session session) throws JMSException {
    24. return session
    25. .createTextMessage("同时向三个Queue中发送相同的消息");
    26. }
    27. });
    28. System.out.println("同时向三个Queue中发送相同的消息-发送完成...");
    29. }
    30. }

    applicationContext-compositeDestination.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans
    5. http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
    6. <!--创建连接工厂 -->
    7. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    8. <property name="brokerURL" value="tcp://localhost:61616"></property>
    9. </bean>
    10. <!-- 通配符 供消费者使用 -->
    11. <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    12. <constructor-arg index="0" value="FOO.*"></constructor-arg>
    13. </bean>
    14. <!-- composite destination 供生产者使用 -->
    15. <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue">
    16. <constructor-arg index="0" value="FOO.A,FOO.B,FOO.C,FOO.D"></constructor-arg>
    17. </bean>
    18. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    19. <property name="connectionFactory" ref="connectionFactory"></property>
    20. <property name="defaultDestination" ref="destinationProducer"></property>
    21. <property name="receiveTimeout" value="600"></property>
    22. </bean>
    23. <!-- 消息监听接口 -->
    24. <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener">
    25. </bean>
    26. <!-- 消费者,通过消息侦听器实现 -->
    27. <bean id="consumer"
    28. class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    29. <property name="connectionFactory" ref="connectionFactory" />
    30. <property name="destination" ref="destination" />
    31. <property name="messageListener" ref="jmsMessageListener" />
    32. </bean>
    33. </beans>


    运行结果:



    方式2:同时向Queue中发送相同的消息

    环境、jar包和方式1 相同;
    总共3个文件:
    1. JmsMessageListener.java
    2. Sender.java
    3. applicationContext-compositeDestination.xml

    JmsMessageListener.java

    1. package com.ll.compositeDestination;
    2. import javax.jms.JMSException;
    3. import javax.jms.Message;
    4. import javax.jms.MessageListener;
    5. import javax.jms.TextMessage;
    6. public class JmsMessageListener implements MessageListener {
    7. public void onMessage(Message message) {
    8. System.out.println("消息全部内容:" + message.toString());
    9. try {
    10. System.out.println("消息主题:" + message.getJMSDestination().toString());
    11. } catch (JMSException e1) {
    12. e1.printStackTrace();
    13. }
    14. TextMessage tm = (TextMessage) message;
    15. try {
    16. System.out.println("消息体:" + tm.getText());
    17. } catch (JMSException e) {
    18. e.printStackTrace();
    19. }
    20. System.out.println("------------------------------------");
    21. }
    22. }


    Sender.java

    1. package com.ll.compositeDestination;
    2. import javax.jms.Destination;
    3. import javax.jms.JMSException;
    4. import javax.jms.Message;
    5. import javax.jms.Queue;
    6. import javax.jms.Session;
    7. import org.apache.activemq.command.ActiveMQQueue;
    8. import org.springframework.context.ApplicationContext;
    9. import org.springframework.context.support.ClassPathXmlApplicationContext;
    10. import org.springframework.jms.core.JmsTemplate;
    11. import org.springframework.jms.core.MessageCreator;
    12. public class Sender {
    13. public static void main(String[] args) {
    14. ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
    15. "applicationContext-compositeDestination.xml");
    16. JmsTemplate template = (JmsTemplate) applicationContext
    17. .getBean("jmsTemplate");
    18. Destination destination = (Destination) applicationContext
    19. .getBean("destinationProducer");
    20. try {
    21. Thread.sleep(3000);
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. // 发送消息
    26. template.send(destination, new MessageCreator() {
    27. public Message createMessage(Session session) throws JMSException {
    28. return session.createTextMessage("同时向多个Queue、Topic中发送相同的消息");
    29. }
    30. });
    31. }
    32. }


    applicationContext-compositeDestination.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans
    5. http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
    6. <!--创建连接工厂 -->
    7. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    8. <property name="brokerURL" value="tcp://localhost:61616"></property>
    9. </bean>
    10. <!-- 通配符FOO.* 供消费者使用 -->
    11. <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    12. <constructor-arg index="0" value="FOO.*"></constructor-arg>
    13. </bean>
    14. <!-- 通配符NOTIFY.FOO.* 供消费者使用 -->
    15. <bean id="destination2" class="org.apache.activemq.command.ActiveMQTopic">
    16. <constructor-arg index="0" value="NOTIFY.FOO.*"></constructor-arg>
    17. </bean>
    18. <!-- composite destination 供生产者使用 ,多个Queue和多个Topic -->
    19. <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue">
    20. <constructor-arg index="0"
    21. value="FOO.1,FOO.2,FOO.3,FOO.4,topic://NOTIFY.FOO.D,topic://NOTIFY.FOO.E,topic://NOTIFY.FOO.F"></constructor-arg>
    22. </bean>
    23. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    24. <property name="connectionFactory" ref="connectionFactory"></property>
    25. <property name="defaultDestination" ref="destinationProducer"></property>
    26. <property name="receiveTimeout" value="600"></property>
    27. </bean>
    28. <!-- 消息监听接口 -->
    29. <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener">
    30. </bean>
    31. <!-- 消息侦听器容器,监听destination -->
    32. <bean id="consumer"
    33. class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    34. <property name="connectionFactory" ref="connectionFactory" />
    35. <property name="destination" ref="destination" />
    36. <property name="messageListener" ref="jmsMessageListener" />
    37. </bean>
    38. <!-- 消息侦听器容器,监听destination2 -->
    39. <bean id="consumer2"
    40. class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    41. <property name="connectionFactory" ref="connectionFactory" />
    42. <property name="destination" ref="destination2" />
    43. <property name="messageListener" ref="jmsMessageListener" />
    44. </bean>
    45. </beans>


    运行结果:


    从上面两张图可以看出,发送到多个Queue中的消息全部被消费了,但是发送到多个Topic中的消息,
    有且只有一个Topic的消息被消费了,其他的消息都没有被消费,为什么??
    个人理解:
    因为是同时发送到多个Queue和Topic中的(注意是同时,相同时刻),而消费者采用异步接收方式,
    当所有消息都到达时,onMessage函数处理不过来,最多只能处理一个。
    之所以Queue全部被消费了,而Topic只有1个被消费,是因为没有立即被消费的Queue消息,
    会一直保存在MQ服务器(Queue消息:生产者和消费者没有时间依赖性),而Topic消息:生产者和消费者有时间依赖性,
    没有被及时消费掉的消息,就再也没有机会消费了。






  • 相关阅读:
    WyBox 7620a 启用第二个串口
    简书上关于spring boot不错的文章
    Springboot quartz集群(3) — 多节点发送邮件
    使用Gradle构建多模块SpringBoot项目
    SpringCloud的Ribbon自定义负载均衡算法
    Quartz和Spring Task定时任务的简单应用和比较
    zuul超时及重试配置
    spring cloud服务器启动之后立刻通过zuul访问其中的实例报zuul连接超时的问题
    com.netflix.zuul.exception.ZuulException:Forwarding error
    Maven项目:@Override is not allowed when implement interface method
  • 原文地址:https://www.cnblogs.com/ssslinppp/p/4469337.html
Copyright © 2011-2022 走看看