zoukankan      html  css  js  c++  java
  • ActiveMQ producer不断发送消息,会导致broker内存耗尽吗?

    http://activemq.apache.org/my-producer-blocks.html 回答了这个问题:

    ActiveMQ 5.x 支持Message Cursors,它默认把消息从内存移出到磁盘上。所以,只有在分配给message store的磁盘空间被用完了,才会出现问题。分配的磁盘空间是可以配置的。

    http://activemq.apache.org/message-cursors.html 有一张描述store based cursor的图:

    上图中的元素对应的数据结构如下:

    public class Queue extends BaseDestination implements Task, UsageListener {
        // StoreQueueCursor
        protected PendingMessageCursor messages;
    }
    public class StoreQueueCursor extends AbstractPendingMessageCursor {
    
        private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
        private final Broker broker;
        private int pendingCount;
        private final Queue queue;
        // 非持久化 pending cursor,真实类型是 FilePendingMessageCursor
        private PendingMessageCursor nonPersistent;
        // 持久化 pending cursor,真实类型是 QueueStorePrefetch
        private final QueueStorePrefetch persistent;
        private boolean started;
        private PendingMessageCursor currentCursor;
    }
    class QueueStorePrefetch extends AbstractStoreCursor {
        private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
        // Message Store
        private final MessageStore store;
        private final Broker broker;
    }

    调试时,message store 的类型为 KahaDBTransactionStore$1

    producer发送消息后,broker的调用栈:

    org.apache.activemq.broker.region.Queue.doMessageSend

    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
            Exception {
        final ConnectionContext context = producerExchange.getConnectionContext();
        ListenableFuture<Object> result = null;
        boolean needsOrderingWithTransactions = context.isInTransaction();
    
        producerExchange.incrementSend();
        checkUsage(context, producerExchange, message);
        sendLock.lockInterruptibly();
        try {
            // store类型是KahaDBTransactionStore$1
            // 持久化消息,先存入kahadb,也就是图中的message store
            if (store != null && message.isPersistent()) {
                try {
                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                    if (messages.isCacheEnabled()) {
                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
                        result.addListener(new PendingMarshalUsageTracker(message));
                    } else {
                        store.addMessage(context, message);
                    }
                    if (isReduceMemoryFootprint()) {
                        message.clearMarshalledState();
                    }
                } catch (Exception e) {
                    // we may have a store in inconsistent state, so reset the cursor
                    // before restarting normal broker operations
                    resetNeeded = true;
                    throw e;
                }
            }
            // did a transaction commit beat us to the index?
            synchronized (orderIndexUpdates) {
                needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty();
            }
            if (needsOrderingWithTransactions ) {
                // If this is a transacted message.. increase the usage now so that
                // a big TX does not blow up
                // our memory. This increment is decremented once the tx finishes..
                message.incrementReferenceCount();
    
                registerSendSync(message, context);
            } else { // 普通的非事务消息,加到 pending list 中
                // Add to the pending list, this takes care of incrementing the
                // usage manager.
                sendMessage(message);
            }
        } finally {
            sendLock.unlock();
        }
        if (!needsOrderingWithTransactions) {
            messageSent(context, message);
        }
        if (result != null && message.isResponseRequired() && !result.isCancelled()) {
            try {
                result.get();
            } catch (CancellationException e) {
                // ignore - the task has been cancelled if the message
                // has already been deleted
            }
        }
    }

    StoreQueueCursor.addMessageLast

    public synchronized void addMessageLast(MessageReference node) throws Exception {
        if (node != null) {
            Message msg = node.getMessage();
            if (started) {
                pendingCount++;
                if (!msg.isPersistent()) {
                    //对应图中的 non-persistent pending cursor
                    nonPersistent.addMessageLast(node);
                }
            }
            if (msg.isPersistent()) {
                // 对应图中的 persistent pending cursor
                persistent.addMessageLast(node);
            }
        }
    }

    store based cursor图中的数据流,基本梳理清楚,还差non-persistent pending curosr 到 tmeporary files的数据流。

    //org.apache.activemq.broker.region.cursors.FilePendingMessageCursor
    @Override
    public synchronized void addMessageLast(MessageReference node) throws Exception {
        tryAddMessageLast(node, 0);
    }
    
    @Override
    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
        if (!node.isExpired()) {
            try {
                regionDestination = (Destination) node.getMessage().getRegionDestination();
                if (isDiskListEmpty()) {
                    if (hasSpace() || this.store == null) {
                        memoryList.addMessageLast(node);
                        node.incrementReferenceCount();
                        setCacheEnabled(true);
                        return true;
                    }
                }
                if (!hasSpace()) {
                    if (isDiskListEmpty()) {
                        expireOldMessages();
                        if (hasSpace()) {
                            memoryList.addMessageLast(node);
                            node.incrementReferenceCount();
                            return true;
                        } else {
                            flushToDisk();
                        }
                    }
                }
                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
                    ByteSequence bs = getByteSequence(node.getMessage());
                    //把消息写到磁盘
                    getDiskList().addLast(node.getMessageId().toString(), bs);
                    return true;
                }
                return false;
    
            } catch (Exception e) {
                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
                throw new RuntimeException(e);
            }
        } else {
            discardExpiredMessage(node);
        }
        //message expired
        return true;
    }
    
    //org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
    // 判断内存使用量是否超过70%
    public boolean hasSpace() {
        return systemUsage != null ? (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
    }

    在内存使用发送变化时,会触发flush:

     

    FilePendingMessageCursor.onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)

    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
        // 内存使用超过70%,会把消息刷到磁盘上,后面的hasSpace()方法也是以此判断
        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
            synchronized (this) {
                if (!flushRequired && size() != 0) {
                    flushRequired =true;
                    if (!iterating) {
                        expireOldMessages();
                        if (!hasSpace()) {
                            flushToDisk();
                            flushRequired = false;
                        }
                    }
                }
            }
        }
    }
    public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
        protected int memoryUsageHighWaterMark = 70;
    }
  • 相关阅读:
    Codeforces Round #311 (Div. 2)题解
    firefox 被劫持hao123 主页
    国有航空为啥“放下身段”读春秋?
    ORACLE中常见SET指令
    最大概率法分词及性能測试
    怎样利用JDBC连接并操作Oracle数据库
    hdu5240
    代码调试过程中easy遇到的问题
    最简单的基于FFmpeg的AVDevice样例(读取摄像头)
    FPGA 功耗结构设计
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8601704.html
Copyright © 2011-2022 走看看