zoukankan      html  css  js  c++  java
  • ActiveMQ学习第五篇:Destination的特性

    Wildcards(通配符)

    Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
      ActiveMQ支持以下三种通配符:

    • ".":用于作为路径上名字间的分隔符
    • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
    • "*":用于作为路径上任何名字。
      举例来说,如有以下两个Destination:
        PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
        PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)
    1. PRICE.> :匹配任何产品的价格变动
    2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
    3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
    4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
    // 实例化连接工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
    // 通过连接工厂获取连接
    Connection connection = connectionFactory.createConnection();
    // 启动连接
    connection.start();
    // 创建session
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    // 创建队列
    Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
    // 创建生产者
    MessageProducer messageProducer = session.createProducer(destination);
    for (int i = 1; i <= 10; i++) {
        TextMessage textMessage = session.createTextMessage(message);
        messageProducer.send("Mac Air价格:"  + i * 1000);
        System.out.println("发送消息 - " + textMessage.getText());
    }
    session.commit();
    
    // 实例化连接工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
    // 通过连接工厂获取连接
    Connection connection = connectionFactory.createConnection();
    // 启动连接
    connection.start();
    // 创建session
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    // 创建队列
    Destination destination = session.createQueue("PRICE.COMPUTER.>");
    // 创建消费者
    MessageConsumer messageConsumer = session.createConsumer(destination);
    messageConsumer.setMessageListener(new MessageListener(){
      @Override
        public void onMessage(Message message) {
            try {
                System.out.println("收到的消息:" + ((TextMessage) message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    

    通配符中是为消费者服务的。即:通配符只能配置在消费端。

    Composite Destinations(组合列队)

    组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过compositedestinations在一个操作中同时向多个queue发送消息。

    客户端实现的方式:

    ​ 在composite destinations中,多个destination之间采用“,”分割。例如:

            //创建一个队列
           // Destination destination = session.createQueue("test,test1");
            Queue queue = new ActiveMQQueue("test,test1");
            //创建生产者
    //        MessageProducer producer = session.createProducer(destination);
            MessageProducer producer = session.createProducer(queue);
    

    如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

    Queue queue = new ActiveMQQueue("test,topic://192.168.100.155::61616");
    

    在xml配置实现的方式:

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
            <!-- 虚拟的queue的名字-->
          <compositeQueue name="MY.QUEUE">
            <forwardTo>
                <!-- 实际发送的名称 -->
              <queue physicalName="my-queue" />
              <queue physicalName="my-queue2" />
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
    

    使用filtered destinations,在xml配置如下:

    <destinationInterceptors>
           <virtualDestinationInterceptor>
                  <virtualDestinations>
                         <compositeQueue name="MY.QUEUE">
                                <forwardTo>
                                     <filteredDestination selector="odd='yes'" queue="FOO"/>
                                     <filteredDestination selector="i = 5" topic="BAR" />
                                </forwardTo>
                         </compositeQueue>
                  </virtualDestinations>
           </virtualDestinationInterceptor>
    </destinationInterceptors>
    

    避免在network连接到broker,出现重复消息:

    <networkConnectors>
    <networkConnector uri= "static://(tcp://localhost:61616) " >
    <excludedDestinations>
        <queue physicalName="Consumer.*VirtualTopic.> " />
    </ excludedDestinations>
    </ networkConnector>
    </ networkConnectors>
    

    在ActiveMQ启动时候就创建Destination

    <broker xmlns="http://activemq.apache.org/schema/core">
           <destinations>
                  <queue physicalName="FOO.BAR" />
                  <queue physicalName="SOME.TOPIC" />
           </destinations>
    </broker>
    

    Delete Inactive Destinations (删除无用的队列)

    可以通过web控制台或是JMX方式来删除掉,通过配置文件,自动探测无用的队列并删除掉,回收响应资源,配置如下:
    ​ SchedulePeriodForDestinationPurge:设置多长时间检查一次。
    ​ inactiveTimeoutBeforeGC:设置当destination为空后,多长时间被删除,这里是30s,默认为60
    ​ gcInactiveDestinations:设置删除掉不活动队列,默认为false

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="10000">
           <destinationPolicy>
                  <policyMap>
                    <policyEntries>
                         <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                    </policyEntries>
                  </policyMap>
           </destinationPolicy>
    </broker>
    
    

    Destination options (队列选项)

    队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
    1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
    2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
    3:consumer.noLocal :默认false。
    4:consumer.dispatchAsync :是否异步分发 ,默认true。
    5:consumer.retroactive:是否为回溯消费者 ,默认false。
    6:consumer.selector:Jms的Selector,默认null。
    7:consumer.exclusive:是否为独占消费者 ,默认false。
    8:consumer.priority:设置消费者的优先级,默认0。

    queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
    consumer = session.createConsumer(queue);
    
    

    Visual Destinations

    前面也说到了两个虚拟主题,虚拟Destinations和组合Destinations
      ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
      (1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
    在这里插入图片描述
    (2)同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
      Activemq可以实现虚拟的Topic来解决这两个问题。

    使用虚拟主题:

    对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:

    Topic destination = session.createTopic("VirtualTopic.Mobille");
    
    

      对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

      如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。

      又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:

    public static void main(String[] args) throws JMSException {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
            Connection connection = factory.createConnection();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建虚拟主题,加前缀VirtualTopic
            Topic topic = session.createTopic("VirtualTopic.TestTopic");
            MessageProducer producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for (int i = 0; i < 30; i++) {
                TextMessage textMessage = session.createTextMessage("topic消息===" + i);
                producer.send(textMessage);
            }
            session.commit();
            connection.close();
        }
    
    
     public static void main(String[] args) throws JMSException {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
            Connection connection = factory.createConnection();
            connection.start();
            final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建队列
            Destination destination = session.createQueue("Consumer.A.VirtualTopic.TestTopic");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("Consumer.A.接收到得消息:" + textMessage.getText());
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    
     public static void main(String[] args) throws JMSException {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
            Connection connection = factory.createConnection();
            connection.start();
            final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建队列
            Destination destination = session.createQueue("Consumer.B.VirtualTopic.TestTopic");
            final MessageConsumer consumer = session.createConsumer(destination);
            final MessageConsumer messageConsumer = session.createConsumer(destination);
            //模拟多个consumer消费一个queue
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        consumer.setMessageListener(new MessageListener() {
                            @Override
                            public void onMessage(Message message) {
                                TextMessage textMessage = (TextMessage) message;
                                try {
                                    System.out.println("Consumer.B-->consumer接收到消息:" + textMessage.getText());
                                    session.commit();
                                } catch (JMSException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        messageConsumer.setMessageListener(new MessageListener() {
                            @Override
                            public void onMessage(Message message) {
                                TextMessage textMessage = (TextMessage) message;
                                try {
                                    System.out.println("Consumer.B-->messageConsumer接收到消息:" + textMessage.getText());
                                    session.commit();
                                } catch (JMSException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
    

    在接收消息之前,应该先运行一下consumer客户端,将消费者注册到Broker中。
    xml配置:
    默认虚拟主题的前缀是: VirtualTopic.>
    自定义消费虚拟地址默认格式: Consumer.*.VirtualTopic.>
    自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

    <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <virtualTopic name=">" prefix="自定义前缀.*." selectorAware="false"/>
                </virtualDestinations>
            </virtualDestinationInterceptor>
    </destinationInterceptors>
    
    

    Mirrored Queules(镜像队列)

      ActiveMQ中每个queue中的消息只能被一一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue来把消息转发到多个queues中。但是为系统中每个queue都进行如此的配置可能会很麻烦。
      ActiveMQ支持Mirrored Queues。 Broker 会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。 为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirrortopic的前缀, 缺省是:“VirtualTopic. Mirror."
    在这里插入图片描述

    <destinationInterceptors>
        <mirroredQueue copyMessage = "true" postfix=".qmirror" prefix=""/>
    </destinationInterceptors>
    
    

    在这里插入图片描述

    这样发送之后会自动存放到一个topic里面。需要定于那个topic就可以监听到消息了。
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    ES6、ES7、ES8特性
    【react】XXX项目环境搭建
    map
    vector
    list
    米勒素数模板
    POJ-2421-Constructing Roads(最小生成树 普利姆)
    HDU1301 Jungle Roads(Kruskal)
    Truck History(prime)
    phpstorm快捷键和激活
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12664353.html
Copyright © 2011-2022 走看看