zoukankan      html  css  js  c++  java
  • ActiveMQ consumer按顺序处理消息

    http://activemq.apache.org/exclusive-consumer.html

    producer发送消息是有先后顺序的,这种顺序保持到了broker中。如果希望消息按顺序被消费掉,则应该把消息投送给单独一个consumer。如果队列只有一个consumer,那就很ok了,broker没有选择。但是,一旦唯一的consumer挂了,会造成服务不可用。因此出现了exclusive consumer,配置如下:

    new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

    如果有2个consumer都是这样配置的,broker只会把队列消息发送给其中一个consumer,如果这个consumer挂掉了,broker会把消息推送给另外的consumer,这样就保证了按顺序消费消息。

    那么,ActiveMQ是怎样实现这种逻辑的呢?

    org.apache.activemq.broker.region.Queue中维持了一个consumer列表,分发消息的时候,会去遍历列表,在队列中靠前的consumer会优先被分发消息。

    // org.apache.activemq.broker.region.Queue
    // 该方法把消息分发给consumer,PendingList是消息列表
    private PendingList doActualDispatch(PendingList list) throws Exception {
        //消费者列表
        List<Subscription> consumers;
        consumersLock.writeLock().lock();
    
        try {
            if (this.consumers.isEmpty()) { // 消费者为空,直接返回
                // slave dispatch happens in processDispatchNotification
                return list;
            }
            consumers = new ArrayList<Subscription>(this.consumers);
        } finally {
            consumersLock.writeLock().unlock();
        }
    
        // 初始化fullConsumers
        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
        // 遍历消息列表
        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
            MessageReference node = iterator.next();
            Subscription target = null;
            // 遍历消费者
            for (Subscription s : consumers) {
                if (s instanceof QueueBrowserSubscription) {
                    continue;
                }
    
                if (!fullConsumers.contains(s)) {
                    if (!s.isFull()) { //消费者not full
                        //满足以下条件可以分发:
                        //1. 符合QueueDispatchSelector的规则
                        //2. 消息的group属性和消费者匹配
                        //3. 消息没有被应答
                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                            // Dispatch it.
                            s.add(node);
                            LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
                            //从发送列表中删除,消息并不会真的删除
                            iterator.remove();
                            //设置消息的target
                            target = s;
                            break;
                        }
                    } else {
                        // no further dispatch of list to a full consumer to
                        // avoid out of order message receipt
                        fullConsumers.add(s);
                        LOG.trace("Subscription full {}", s);
                    }
                }
            }
    
            if (target == null && node.isDropped()) {
                iterator.remove();
            }
    
            // return if there are no consumers or all consumers are full
            if (target == null && consumers.size() == fullConsumers.size()) {
                return list;
            }
    
            // 在列表中调整consumer的顺序
            // 如果是exlusive consumer,则不会进分支,那么exlusive consumer的顺序不会变
            // 一旦进入这个分支,当前的consumer会被放到最后
            // If it got dispatched, rotate the consumer list to get round robin
            // distribution.
            if (target != null && !strictOrderDispatch && consumers.size() > 1
                    && !dispatchSelector.isExclusiveConsumer(target)) {
                consumersLock.writeLock().lock();
                try {
                    // 先从this.consumers中删除当前consumer
                    if (removeFromConsumerList(target)) {
                        // 然后把当前consumer添加到this.consumers中
                        addToConsumerList(target);
                        consumers = new ArrayList<Subscription>(this.consumers);
                    }
                } finally {
                    consumersLock.writeLock().unlock();
                }
            }
        }
    
        return list;
    }
    
    
    private void addToConsumerList(Subscription sub) {
        if (useConsumerPriority) {
            consumers.add(sub);
            Collections.sort(consumers, orderedCompare);
        } else {
            consumers.add(sub);
        }
    }

    QueueDispatchSelector保证:如果配置了 exclusive consumer,一定会把消息分发给 exclusive consumer。

    // org.apache.activemq.broker.region.QueueDispatchSelector
    public boolean canSelect(Subscription subscription,
            MessageReference m) throws Exception {
       
        boolean result =  super.canDispatch(subscription, m);
        if (result && !subscription.isBrowser()) {
            // 没有配置exclusiveConsumer,或者exclusiveConsumer就是当前消费者
            result = exclusiveConsumer == null || exclusiveConsumer == subscription;
        }
        return result;
    }

    在添加消费者的时候,设置exclusive consumer:

    //org.apache.activemq.broker.region.Queue
    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
        LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });
    
        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();
        try {
    
            sub.add(context, this);
    
            // needs to be synchronized - so no contention with dispatching
            // consumersLock.
            consumersLock.writeLock().lock();
            try {
                // set a flag if this is a first consumer
                if (consumers.size() == 0) {
                    firstConsumer = true;
                    if (consumersBeforeDispatchStarts != 0) {
                        consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
                    }
                } else {
                    if (consumersBeforeStartsLatch != null) {
                        consumersBeforeStartsLatch.countDown();
                    }
                }
    
                addToConsumerList(sub);
                //设置 exclusive consumer
                if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
                    if (exclusiveConsumer == null) {
                        // exclusiveConsumer为空
                        exclusiveConsumer = sub;
                    } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
                        sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
                        //如果当前订阅者的优先级比已有的exclusiveConsumer高
                        exclusiveConsumer = sub;
                    }
                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                }
            } finally {
                consumersLock.writeLock().unlock();
            }
    
            if (sub instanceof QueueBrowserSubscription) {
                // tee up for dispatch in next iterate
                QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
                BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
                browserDispatches.add(browserDispatch);
            }
    
            if (!this.optimizedDispatch) {
                wakeup();
            }
        } finally {
            pagedInPendingDispatchLock.writeLock().unlock();
        }
        if (this.optimizedDispatch) {
            // Outside of dispatchLock() to maintain the lock hierarchy of
            // iteratingMutex -> dispatchLock. - see
            // https://issues.apache.org/activemq/browse/AMQ-1878
            wakeup();
        }
    }
  • 相关阅读:
    【[Offer收割]编程练习赛12 B】一面砖墙
    【[Offer收割]编程练习赛12 A】歌德巴赫猜想
    【codeforces 779E】Bitwise Formula
    Java Web整合开发(85)
    数字
    T2602 最短路径问题 codevs
    P3378 堆【模板】 洛谷
    T1013 求先序排列 codevs
    P1717 钓鱼 洛谷
    P2085 最小函数值 洛谷
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8652484.html
Copyright © 2011-2022 走看看