zoukankan      html  css  js  c++  java
  • Consumer高级特性

    一、Exclusive Consumer

    Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。

    Destination queue = session.createQueue("my-queue7?consumer.exclusive=true");
    
    MessageConsumer consumer = session.createConsumer(queue);

    二、Message Groups

    Message Groups就是对消息分组,它是Exclusive Consumer功能的增强:

    逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。
    Message Groups特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer(只要这个consumer保持active)。

    //创建一个Message Groups,只需要在message对象上设置属性即可
    message.setStringProperty("JMSXGroupID", "GroupA");
                 
     //关闭一个Message Groups,只需要在message对象上设置属性即可
    message.setStringProperty("JMSXGroupID","GroupA");
    message.setIntProperty("JMSXGroupSeq", -1);

     

    三、Message Selectors

    consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");

    1:JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等
    2:需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值
    3:表达式中的属性不会自动进行类型转换,例如:
      myMessage.setStringProperty("NumberOfOrders", "2"); 那么此时“NumberOfOrders > 1” 求值结果会是false
    4:Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确
      定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,
      例如:设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B
        和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有
      以下缺点:
      (1)producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
      (2)如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上

    四、Slow Consumer Handling

    慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。目前ActiveMQ使用Pending Message Limit Strategy来解决这个问
    题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。
    通过在配置文件的destination map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。
    Pending Message Limit Strategy(等待消息限制策略)目前有以下两种:
    1: Constant Pending Message Limit Strategy Limit可以设置0、>0、-1三种方式: 0表示:不额外的增加其预存大小。 >0表示:再额外的增
           加其预存大小。 -1表示:不增加预存也不丢弃旧的消息。 这个策略使用常量限制,配置如下:
            <constantPendingMessageLimitStrategy limit="50"/>
    2:Prefetch Rate Pending Message Limit Strategy
    这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:
        <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
    3:说明:在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息

  • 相关阅读:
    带你玩转Flink流批一体分布式实时处理引擎
    都2022年了,你的前端工具集应该有vueuse
    云图说|图解DGC:基于华为智能数据湖解决方案的一体化数据治理平台
    面试官: Flink双流JOIN了解吗? 简单说说其实现原理
    4种Spring Boot 实现通用 Auth 认证方式
    这8个JS 新功能,你应该去尝试一下
    Scrum Master需要具备哪些能力和经验
    dart系列之:时间你慢点走,我要在dart中抓住你
    dart系列之:数学什么的就是小意思,看我dart如何玩转它
    dart系列之:还在为编码解码而烦恼吗?用dart试试
  • 原文地址:https://www.cnblogs.com/xiaoliangup/p/9357071.html
Copyright © 2011-2022 走看看