zoukankan      html  css  js  c++  java
  • ActiveMQ Message Groups

    http://activemq.apache.org/message-groups.html

    与Exclusive Consumer相比,Message Groups的对消息分组的粒度更细。具有相同groupId的消息会被投送到同一个消费者,除非这个消费者挂了。

    代码示例:

    Mesasge message = session.createTextMessage("<foo>hey</foo>");
    // 设置groupId
    message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
    // 设置sequence
    message.setIntProperty("JMSXGroupSeq", -1);
    
    producer.send(message);

    对应的代码在 org.apache.activemq.broker.region.Queue 中:

    // 判断消息能否分发给消费者,返回true表示可以
    // Subscription 表示消费者,QueueMessageReference 表示消息
    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) 
                throws Exception {
        // 默认为true
        boolean result = true;
        // Keep message groups together.
        // 获取消息的"JMSXGroupID"属性
        String groupId = node.getGroupID();
        // 获取消息的"JMSXGroupSeq"属性
        int sequence = node.getGroupSequence();
        if (groupId != null) {
            // MessageGroupMap是一个Map,键是groupId,值是消费者
            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
            // If we can own the first, then no-one else should own the
            // rest.
            if (sequence == 1) {
                assignGroup(subscription, messageGroupOwners, node, groupId);
            } else {
    
                // Make sure that the previous owner is still valid, we may
                // need to become the new owner.
                ConsumerId groupOwner;
                // 根据groupId取出消费者
                groupOwner = messageGroupOwners.get(groupId);
                if (groupOwner == null) {
                    assignGroup(subscription, messageGroupOwners, node, groupId);
                } else {
                    if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                        // A group sequence < 1 is an end of group signal.
                        if (sequence < 0) {
                            messageGroupOwners.removeGroup(groupId);
                            subscription.getConsumerInfo().
                            setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
                        }
                    } else {
                        result = false;
                    }
                }
            }
        }
    
        return result;
    }
    
    // 往MessageGroupMap中插入键值对
    protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, 
                            MessageReference n, String groupId) throws IOException {
        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
        Message message = n.getMessage();
        message.setJMSXGroupFirstForConsumer(true);
        subs.getConsumerInfo().
            setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
    }
  • 相关阅读:
    luogu P3375 【模板】KMP字符串匹配
    leetcode[129]Sum Root to Leaf Numbers
    leetcode[130]Surrounded Regions
    leetcode[131]Palindrome Partitioning
    leetcode[132]Palindrome Partitioning II
    leetcode[133]Clone Graph
    leetcode[134]Gas Station
    leetcode[135]Candy
    leetcode[136]Single Number
    leetcode[137]Single Number II
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8671402.html
Copyright © 2011-2022 走看看