zoukankan      html  css  js  c++  java
  • ActiveMQ 中 consumer 的优先级,message 的优先级

    http://activemq.apache.org/consumer-priority.html consumer 优先级

    http://activemq.apache.org/activemq-message-properties.html 消息优先级

    1、设置 consumer 的优先级:

    queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
    consumer = session.createConsumer(queue);

    priority 的取值从0到127。broker 按照 consumer 的优先级给 queue 的 consumers 排序,首先把消息分发给优先级最高的 consumer。一旦该 consumer 的 prefetch buffer 满了,broker 就把消息分发给优先级次高的,prefetch buffer 不满的 consumer。

    // org.apache.activemq.broker.region.Queue
    // consumer priority 的比较器
    private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
    
        @Override
        public int compare(Subscription s1, Subscription s2) {
            // We want the list sorted in descending order
            // 倒序,即数值大的优先级高
            int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
            if (val == 0 && messageGroupOwners != null) {
                // then ascending order of assigned message groups to favour less loaded consumers
                // Long.compare in jdk7
                long x = s1.getConsumerInfo().getLastDeliveredSequenceId();
                long y = s2.getConsumerInfo().getLastDeliveredSequenceId();
                val = (x < y) ? -1 : ((x == y) ? 0 : 1);
            }
            return val;
        }
    };
    
    // 添加 consumer 的时候,会触发排序
    // 在 consumers 列表中,靠前的 consumer,先分发消息
    private void addToConsumerList(Subscription sub) {
        if (useConsumerPriority) {
            consumers.add(sub);
            Collections.sort(consumers, orderedCompare);
        } else {
            consumers.add(sub);
        }
    }

    2、设置 message 的优先级需要在 broker 端和 producer 端配置:

    2.1 在 broker 端设置 TEST.BAT 队列为 prioritizedMessages = "true"

    <policyEntry queue="TEST.BAT" prioritizedMessages="true" producerFlowControl="true" memoryLimit="1mb">
        <deadLetterStrategy>
            <individualDeadLetterStrategy queuePrefix="TEST"/>
        </deadLetterStrategy>
        <pendingQueuePolicy>
            <storeCursor/>
        </pendingQueuePolicy>
    </policyEntry>

    2.2 producer 发送消息时,设置 message 的优先级

    TextMessage message = session.createTextMessage(text);
    producer.send(destination, message, DeliveryMode.NON_PERSISTENT, 1, 0);

    设置 message 的优先级,需要调用:

    void javax.jms.MessageProducer.send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
    throws JMSException

    而不能这么写:

    TextMessage message = session.createTextMessage(text);
    message.setJMSPriority(0);

    初步看是 ActiveMQ 的 bug。消息的 priority 值,从0到9。消息配置了优先级之后,消息存放在 PrioritizedPendingList 中。

    // 省略部分代码
    private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
        private int index = 0;
        private int currentIndex = 0;
        List<PendingNode> list = new ArrayList<PendingNode>(size());
    
        PrioritizedPendingListIterator() {
            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
                // priority 值大的优先级高
                OrderedPendingList orderedPendingList = lists[i];
                if (!orderedPendingList.isEmpty()) {
                    list.addAll(orderedPendingList.getAsList());
                }
            }
        }
    }
  • 相关阅读:
    mbedtls安装与入门【转】
    SpringAop代理对象调用过程(八)
    SpringAOP理解-代理对象创建(七)
    NIO与零拷贝
    NIO实现群聊系统
    SpringAOP概述(六)
    NIO简介以及三大组件(BufferChannelSelector)基本使用
    BIO基本介绍以及使用
    Netty简介
    Spring循环依赖解决(五)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8854319.html
Copyright © 2011-2022 走看看