zoukankan      html  css  js  c++  java
  • Activemq获取消息信息

    Activemq的公告消息

    下面是来自官网的翻译:http://activemq.apache.org/advisory-message.html

    ActiveMQ消息属性

    消息属性

    属性名 类型 默认值 描述
    JMSDestination javax.jms.Destination 生产者set进去 发送消息目的地
    JMSReplyTo javax.jms.Destination null 用户定义
    JMSType String “” 用户定义
    JMSDeliveryMode int DeliveryMode.PERSISTENT 消息是否要持久化
    JMSPriority int 4 0-9
    JMSMessageID String unique 消息唯一标示符
    JMSTimestamp long 消息发送时间 毫秒
    JMSCorrelationID String null 用户定义
    JMSExpiration long 0 0表示用不过气,消息过期时间
    JMSRedelivered boolean false 重新传递

    JMS定义属性

    属性名 类型 默认值 描述
    JMSXDeliveryCount int 0 尝试发送消息的次数
    JMSXGroupID String null 表示消息的分组
    JMSXGroupSeq int 0 消息的序列号
    JMSXProducerTXID String null 事务标示

    ActiveMQ定义属性

    属性名 类型 默认值 描述
    JMSActiveMQBrokerInTime long 0 消息到达broker的时间
    JMSActiveMQBrokerOutTime long 0 消息离开broker的时间

    公告消息(Advisory Message)

    ActiveMQ支持的公告消息(相当于监听器)

    • 消费者,生产者和connection开始和停止

    • 临时目的地的创建和销毁

    • 话题和队列上的消息过期

    • broker发送消息到没有消费者的目的地

    • connections开启和停止

      公告消息是一种管理JMS的渠道,我们可以接收到在broker上的生产者消费者目的地的变化。
      当我们通过JMX查看broker时候,可以看到公告话题以ActiveMQ.Advisory为前缀,每个公告拥有类型为Advisory的消息和一些预定义的消息属性

    属性名 类型 描述
    originBrokerId StringProperty 公告起源的broker的ID
    originBrokerName StringProperty 公告起源的broker的名称
    originBrokerURL StringProperty 公告起源的第一个broker url

    我们可以获得到消息的一些元数据

     Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
        MessageConsumer consumer = session.createConsumer(advisoryDestination);
        consumer.setMessageListener(this);
    public void onMessage(Message msg){
        if (msg instanceof ActiveMQMessage){
            try {
                 ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
                 ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
            } catch (JMSException e) {
                log.error("Failed to process message: " + msg);
            }
        }
    }
    

    支持的公告话题

    客户端的公告消息
    我们可以用consumerCount头信息获取当前消费者的数量来得知当前的公告的消息是否已经发送。

    Advisory Topics Description properties Data Structure
    ActiveMQ.Advisory.Connection ActiveMQ.Advisory.Connection ConnectionInfo, RemoveInfo
    ActiveMQ.Advisory.Producer.Queue Producer start & stop messages on a Queue String=’producerCount’ - the number of producers ProducerInfo
    ActiveMQ.Advisory.Producer.Topic Producer start & stop messages on a Topic String=’producerCount’ - the number of producers ProducerInfo
    ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue String=’consumerCount’ - the number of Consumers ConsumerInfo, RemoveInfo
    ActiveMQ.Advisory.Consumer.Topic Consumer start & stop messages on a Topic String=’consumerCount’ - the number of Consumers ConsumerInfo, RemoveInfo

    目的地和消息的公告

    Advisory Topics 描述 属性 数据结构 默认值 策略属性
    ActiveMQ.Advisory.Queue Queue create & destroy null DestinationInfo true none
    ActiveMQ.Advisory.Topic Topic create & destroy null DestinationInfo true none
    ActiveMQ.Advisory.TempQueue Temporary Queue create & destroy null DestinationInfo true none
    ActiveMQ.Advisory.TempTopic Temporary Topic create & destroy null DestinationInfo true none
    ActiveMQ.Advisory.Expired.Queue Expired messages on a Queue String=’orignalMessageId’ - the expired id Message true none
    ActiveMQ.Advisory.Expired.Topic Expired messages on a Topic String=’orignalMessageId’ - the expired id Message true none
    ActiveMQ.Advisory.NoConsumer.Queue No consumer is available to process messages being sent on a Queue null Message false sendAdvisoryIfNoConsumers
    ActiveMQ.Advisory.NoConsumer.Topic No consumer is available to process messages being sent on a Topic null Message false sendAdvisoryIfNoConsumers

    ActiveMQ5.2新的公告消息

    Advisory Topics 描述 属性 数据结构 默认值 策略属性
    ActiveMQ.Advisory.SlowConsumer.Queue Slow Queue Consumer String=’consumerId’ - the consumer id ConsumerInfo false advisoryForSlowConsumers
    ActiveMQ.Advisory.SlowConsumer.Topic Slow Topic Consumer String=’consumerId’ - the consumer id ConsumerInfo false advisoryForSlowConsumers
    ActiveMQ.Advisory.FastProducer.Queue Fast Queue producer String=’producerId’ - the producer id ProducerInfo false advisdoryForFastProducers
    ActiveMQ.Advisory. FastProducer.Topic Fast Topic producer String=’consumerId’ - the producer id ProducerInfo false advisdoryForFastProducers
    ActiveMQ.Advisory.MessageDiscarded.Queue Message discarded String=’orignalMessageId’ - the discarded id Message false advisoryForDiscardingMessages
    ActiveMQ.Advisory.MessageDiscarded.Topic Message discarded String=’orignalMessageId’ - the discarded id Message false advisoryForDiscardingMessages
    ActiveMQ.Advisory.MessageDelivered.Queue Message delivered to the broker String=’orignalMessageId’ - the delivered id Message false advisoryForDelivery
    ActiveMQ.Advisory.MessageDelivered.Topic Message delivered to the broker String=’orignalMessageId’ - the delivered id Message false advisoryForDelivery
    ActiveMQ.Advisory.MessageConsumed.Queue Message consumed by a client String=’orignalMessageId’ - the delivered id Message false advisoryForConsumed
    ActiveMQ.Advisory.MessageConsumed.Topic Message consumed by a client String=’orignalMessageId’ - the delivered id Message false advisoryForConsumed
    ActiveMQ.Advisory.FULL A Usage resource is at its limit String=’usageName’ - the name of Usage resource null false advisoryWhenFull
    ActiveMQ.Advisory.MasterBroker A broker is now the master in a master/slave configuration null null true none

    5.4后新公告

    Advisory Topics 描述 属性 数据结构 默认值 策略属性
    ActiveMQ.Advisory.MessageDLQd.Queue Message sent to DLQ String=’orignalMessageId’ - the delivered id Message Always on advisoryForConsumed
    ActiveMQ.Advisory.MessageDLQd.Topic Message sent to DLQ String=’orignalMessageId’ - the delivered id Message Always on advisoryForConsumed

    Network bridge advisories

    Advisory Topics 描述 属性 数据结构 默认值
    ActiveMQ.Advisory.NetworkBridge Network bridge being stopped or started Boolean=”started” - true if bridge is started, false if it is stopped Boolean=”createdByDuplex” - true if the bridge is created by remote network connector BrokerInfo - provides data of the remote broker Always on

    开启公告消息

    公告消息默认是不开启的,我们可以使用下面的配置开启公告配置

    <destinationPolicy>
       <policyMap><policyEntries> 
          <policyEntry topic=">" advisoryForConsumed="true" />
       </policyEntries></policyMap>
    </destinationPolicy>
    

    关闭公告消息

    xml配置如下

    <broker advisorySupport="false">
    

    java code 配置

    BrokerService broker = new BrokerService();
    broker.setAdvisorySupport(false);
    broker.start();
    

    url链接串

    "tcp://localhost:61616?jms.watchTopicAdvisories=false"
    

    设置ConnectionFactory的属性

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setWatchTopicAdvisories(false);
    

    ActiveMQ一些静态方法

    AdvisorySupport.getConsumerAdvisoryTopic()
    AdvisorySupport.getProducerAdvisoryTopic()
    AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
    AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
    AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
    AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
    AdvisorySupport.getDestinationAdvisoryTopic()
    AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
    AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
    AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
    AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
    
    //Version 5.2 onwards
    
    
    AdvisorySupport.getSlowConsumerAdvisoryTopic()
    AdvisorySupport.getFastProducerAdvisoryTopic()
    AdvisorySupport.getMessageDiscardedAdvisoryTopic()
    AdvisorySupport.getMessageDeliveredAdvisoryTopic()
    AdvisorySupport.getMessageConsumedAdvisoryTopic()
    AdvisorySupport.getMasterBrokerAdvisoryTopic()
    AdvisorySupport.getFullAdvisoryTopic()
    

    具体可以看一下AdvisorySupport这个类就懂了

    image-20201015223216280

    应用场景:

    本来工作中用websocket当聊天的,结果经过F5后要做了一层代理,消息就不通了,于是就用的mq,需要捕获一个队列的消费者数量的变化,就用了Activemq的公告消息

        @Test
        public void test1(){
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                // 创建 advisory topic : ActiveMQ.Advisory.Consumer.Topic.chat1,用于监控topic消费者的状态变化
                ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQTopic("topic01"));
                MessageConsumer consumer = session.createConsumer(advisoryTopic);
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        if (message instanceof ActiveMQMessage) {
                            try {
                                ActiveMQMessage aMsg = (ActiveMQMessage) message;
                                System.out.println(aMsg.getStringProperty("consumerCount"));
                                System.out.println(aMsg.getStringProperty("producerCount"));
                                if (aMsg.getDataStructure() instanceof ConsumerInfo) {
                                    // Consumer start
                                    ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure();
                                    System.out.println(consumerInfo);
                                } else if (aMsg.getDataStructure() instanceof RemoveInfo) {
                                    // Consumer stop
                                    RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
                                    System.out.println(removeInfo);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                Thread.sleep(2000000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    if(null != connection){
                        connection.close();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    

    测试:先上线后下线

    1
    null
    ConsumerInfo {commandId = 6, responseRequired = true, consumerId = ID:WGR-PC-6298-1602771436318-1:1:2:1, destination = topic://topic01, prefetchSize = 32767, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = wgr, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
    
    
    0
    null
    RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:WGR-PC-6298-1602771436318-1:1:2:1, lastDeliveredSequenceId = -2}
    
    

    其实除了JMX,咨询消息,还有统计插件,没有深入研究,官网说明如下:

    http://activemq.apache.org/statisticsplugin.html

    示例:https://segmentfault.com/a/1190000004522121?utm_source=debugrun&utm_medium=referral

    从ActiveMQ 5.3开始,其中包括一个统计信息插件,可用于从代理或其目的地检索统计信息。请注意,该消息必须包含replyTo标头(jmsReplyTo如果使用的是JMS,则为标头),否则该消息将被忽略。该replyTo标题必须包含从中要获取统计信息(一个或多个)目标的名称。统计消息中MapMessage填充了目标(即,代理或目标)的统计信息。

    要检索代理的统计信息,只需向标ActiveMQ.Statistics.BrokerreplyTo标题的目的地发送一条空消息。要检索目的地统计,只需发送一个空的消息载明的目的地ActiveMQ.Statistics.Destination.<destination-name>ActiveMQ.Statistics.Destination.<wildcard-expression>连同replyTo头。如果许多目的地与给定的通配符表达式匹配,则将向每个目的地发送一条状态消息replyTo

    要将ActiveMQ配置为使用统计信息插件,只需将以下内容添加到ActiveMQ XML配置中:

    <broker ...>
      <plugins>
        <statisticsBrokerPlugin/>
      </plugins>
    </broker>
    

    统计信息插件查找发送到特定目的地的消息。以下是使用统计信息插件从代理获取统计信息的示例:

    Queue replyTo = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(replyTo);
    
    String queueName = "ActiveMQ.Statistics.Broker";
    Queue testQueue = session.createQueue(queueName);
    MessageProducer producer = session.createProducer(testQueue);
    Message msg = session.createMessage();
    msg.setJMSReplyTo(replyTo);
    producer.send(msg);
    
    MapMessage reply = (MapMessage) consumer.receive();
    assertNotNull(reply);
    
    for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
      String name = e.nextElement().toString();
      System.out.println(name + "=" + reply.getObject(name));
    }
    

    上面代码的输出如下所示:

    vm=vm://localhost  
    memoryUsage=0  
    storeUsage=3330  
    tempPercentUsage=0  
    ssl=  
    openwire=tcp://localhost:50059  
    brokerId=ID:bigmac-50057-1253605065511-0:0  
    consumerCount=2  
    brokerName=localhost  
    expiredCount=0  
    dispatchCount=1  
    maxEnqueueTime=5.0  
    storePercentUsage=0  
    dequeueCount=0  
    inflightCount=1  
    messagesCached=0  
    tempLimit=107374182400  
    averageEnqueueTime=5.0  
    stomp+ssl=  
    memoryPercentUsage=0  
    size=10  
    tempUsage=0  
    producerCount=1  
    minEnqueueTime=5.0  
    dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data  
    enqueueCount=10  
    stomp=  
    storeLimit=107374182400  
    memoryLimit=67108864
    

    同样,要查询目标的统计信息,只需将消息发送到以开头的目标名称即可ActiveMQ.Statistics.Destination。例如,要检索名称为TEST.FOO的队列的统计信息,请将空消息发送到名为的队列ActiveMQ.Statistics.Destination.TEST.FOO。下面是一个示例:

    Queue replyTo = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(replyTo);
    
    Queue testQueue = session.createQueue("TEST.FOO");
    MessageProducer producer = session.createProducer(null);
    
    String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()
    Queue query = session.createQueue(queueName);
    
    Message msg = session.createMessage();
    
    producer.send(testQueue, msg) 
    msg.setJMSReplyTo(replyTo);
    producer.send(query, msg);
    MapMessage reply = (MapMessage) consumer.receive();
    assertNotNull(reply);
    assertTrue(reply.getMapNames().hasMoreElements());
            
    for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
        String name = e.nextElement().toString();
        System.err.println(name + "=" + reply.getObject(name));
    }
    

    上面代码的输出如下所示:

    memoryUsage=0  
    dequeueCount=0  
    inflightCount=0  
    messagesCached=0  
    averageEnqueueTime=0.0  
    destinationName=queue://TEST.FOO  
    size=1  
    memoryPercentUsage=0  
    producerCount=0  
    consumerCount=0  
    minEnqueueTime=0.0  
    maxEnqueueTime=0.0  
    dispatchCount=0  
    expiredCount=0  
    enqueueCount=1  
    memoryLimit=67108864
    

    您也可以在队列名称中使用通配符。对于通配符匹配的每个目的地,这将导致单独的统计信息。确实非常方便。

    订阅统计

    从5.6.0开始,您还可以检索所有队列和主题订阅的统计信息。您所需要做的只是将空消息发送到ActiveMQ.Statistics. Subscription带有replyTo标题的目的地。响应将以一个或多个消息的形式出现,每个消息都包含关于Broker上恰好一个订阅的统计信息。

    以下是使用统计信息插件从代理获取统计信息的示例:

    Queue replyTo = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(replyTo);
    
    String queueName = "ActiveMQ.Statistics.Subscription";
    Queue testQueue = session.createQueue(queueName);
    MessageProducer producer = session.createProducer(testQueue);
    Message msg = session.createMessage();
    msg.setJMSReplyTo(replyTo);
    producer.send(msg);
    
    MapMessage reply = (MapMessage) consumer.receive();
    assertNotNull(reply);
    
    for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
      String name = e.nextElement().toString();
      System.out.println(name + "=" + reply.getObject(name));
    }
    

    上面的代码示例输出如下所示:

    selector=null  
    dispatchedQueueSize=1  
    maximumPendingMessageLimit=0  
    exclusive=false  
    connectionId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-4:1  
    destinationName=Test.Queue  
    clientId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-3:1  
    slowConsumer=false  
    prefetchSize=1000  
    sessionId=1  
    dequeueCounter=0  
    enqueueCounter=1  
    retroactive=false  
    dispatchedCounter=1
    
  • 相关阅读:
    Maximum of lines in a DataBand
    "New page after" by code
    How to show out three rows from the same databand On A4?
    Asp.Net Core 第07局:路由
    Asp.Net Core 第06局:中间件
    Asp.Net Core 第05局:读取配置
    Asp.Net Core 第04局:依赖注入
    POJ-1003
    ORACLE 存储过程实例 [备忘录]
    关于操作有符号数的溢出问题
  • 原文地址:https://www.cnblogs.com/dalianpai/p/13823644.html
Copyright © 2011-2022 走看看