zoukankan      html  css  js  c++  java
  • ActiveMQ 到底是推还是拉?

    http://activemq.apache.org/destination-options.html 

    1. consumer 的配置参数如下图:

     配置consumer的示例:

    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory =
                    new ActiveMQConnectionFactory("tcp://localhost:61616");
    
            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();
            connection.setExceptionListener(this);
            // Create a Session
            ActiveMQSession session = 
              (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) // 此时加入参数 ActiveMQQueue destination =
              (ActiveMQQueue) session.createQueue("TEST.FOO?consumer.prefetchSize=10"); // Create a MessageConsumer from the Session to the Topic or Queue ActiveMQMessageConsumer consumer =
              (ActiveMQMessageConsumer) session.createConsumer(destination);
    // 打印出prefetchSize参数值 System.out.println("prefetchSize=" + consumer.getPrefetchNumber()); // Wait for a message Message message = consumer.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } }

    在创建Queue的时候,配置以url形式跟在队列名后面:session.createQueue("TEST.FOO?consumer.prefetchSize=10")

    consumer的prefetchSize参数默认为1000。

    consumer 有推和拉2种方式获取消息:当 prefetchSize = 0 时,pull;当 prefetchSize > 0 时,push。

    2. broker分发消息的逻辑在org.apache.activemq.broker.region.Queue.doActualDispatch方法中:

    private PendingList doActualDispatch(PendingList list) throws Exception {
        List<Subscription> consumers;
        consumersLock.writeLock().lock();
    
        try {
            if (this.consumers.isEmpty()) {
                // slave dispatch happens in processDispatchNotification
                return list;
            }
            consumers = new ArrayList<Subscription>(this.consumers);
        } finally {
            consumersLock.writeLock().unlock();
        }
    
        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
    
        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
            MessageReference node = iterator.next();
            Subscription target = null;
            for (Subscription s : consumers) {
                if (s instanceof QueueBrowserSubscription) {
                    continue;
                }
                if (!fullConsumers.contains(s)) {
                    if (!s.isFull()) {
                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                            // Dispatch it.
                            s.add(node);
                            LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
                            iterator.remove();
                            target = s;
                            break;
                        }
                    } else {
                        // no further dispatch of list to a full consumer to
                        // avoid out of order message receipt
                        fullConsumers.add(s);
                        LOG.trace("Subscription full {}", s);
                    }
                }
            }
    
            if (target == null && node.isDropped()) {
                iterator.remove();
            }
    
            // return if there are no consumers or all consumers are full
            if (target == null && consumers.size() == fullConsumers.size()) {
                return list;
            }
    
            // If it got dispatched, rotate the consumer list to get round robin
            // distribution.
            if (target != null && !strictOrderDispatch && consumers.size() > 1
                    && !dispatchSelector.isExclusiveConsumer(target)) {
                consumersLock.writeLock().lock();
                try {
                    if (removeFromConsumerList(target)) {
                        addToConsumerList(target);
                        consumers = new ArrayList<Subscription>(this.consumers);
                    }
                } finally {
                    consumersLock.writeLock().unlock();
                }
            }
        }
    
        return list;
    }

    2层for循环,外面是消息,里面是consumer,只要consumer没有饱和,broker一直会给consumer分发消息。

    对于一个consumer而言,未确认的消息数大于等于prefetchSize,则认为该consumer是饱的

    // PrefetchSubscription
    public boolean isFull() {
        // 未确认的消息数 = 已发送给该consumer的消息数 - 收到确认的消息数 
        return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
    }

     因为 consumer 的 prefetchSize 参数默认为1000,所以 activeMQ 默认是推。而且是一条一条地推。

    3. consumer获取消息有同步和异步两种方式:consumer.receive() 或 consumer.setMessageListener(listener)

    对于 receive 方式,如果prefetchSize = 0 并且本地没有缓存消息,则发送一个pull 命令给broker;

    否则,则从本地缓存中取消息。

    // ActiveMQMessageConsumer
    public Message receive() throws JMSException {
        checkClosed();
        checkMessageListener();
    
        sendPullCommand(0);
        MessageDispatch md = dequeue(-1);
        if (md == null) {
            return null;
        }
    
        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false);
    
        return createActiveMQMessage(md);
    }
    
    protected void sendPullCommand(long timeout) throws JMSException {
        clearDeliveredList();
        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(info);
            messagePull.setTimeout(timeout);
            session.asyncSendPacket(messagePull);
        }
    }

    consumer 本地消息缓存在

    // These are the messages waiting to be delivered to the client
    protected final MessageDispatchChannel unconsumedMessages;

    消息进入缓存有2条路线,调用栈分别如下:

    (1)

     (2)

     consumer.setMessageListener 异步获取消息的调用栈如下:

  • 相关阅读:
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【把数据存到LBS云2/2】
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【把数据存到LBS云1/2】
    aaronyang的百度地图API之LBS云与.NET开发 Javascript API 2.0【基本地图的操作】
    aaronyang的百度地图API之LBS云 笔记[位置数据 geotable]
    aaronyang的百度地图API之LBS云 笔记[开发准备]
    windows7 sqlserver2012 无法写入受保护的内存 解决办法
    [AaronYang风格]微软Unity2.X系统学习笔记,记录
    Javascript 原生Cookie使用用法
    注册asp.net 4.0 到iis
    茗洋Easy UI 1.3.5 部分问题解决系列专题[自定义alert关闭时间,自动关]
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8574470.html
Copyright © 2011-2022 走看看