zoukankan      html  css  js  c++  java
  • ActiveMQ异步分发消息

    org.apache.activemq.ActiveMQConnection 类中有个参数:

    protected boolean dispatchAsync=true;

    这个参数的含义到底是什么?

    使用这个参数的调用栈如下:

    org.apache.activemq.broker.region.PrefetchSubscription.dispatch

    protected boolean dispatch(final MessageReference node) throws IOException {
        final Message message = node.getMessage();
        if (message == null) {
            return false;
        }
    
        okForAckAsDispatchDone.countDown();
    
        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
        MessageDispatch md = createMessageDispatch(node, message);
        // NULL messages don't count... they don't get Acked.
        if (node != QueueMessageReference.NULL_MESSAGE) {
            dispatchCounter++;
            dispatched.add(node);
        } else {
            while (true) {
                int currentExtension = prefetchExtension.get();
                int newExtension = Math.max(0, currentExtension - 1);
                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
                    break;
                }
            }
        }
        if (info.isDispatchAsync()) {
            md.setTransmitCallback(new TransmitCallback() {
    
                @Override
                public void onSuccess() {
                    // Since the message gets queued up in async dispatch, we don't want to
                    // decrease the reference count until it gets put on the wire.
                    onDispatch(node, message);
                }
    
                @Override
                public void onFailure() {
                    Destination nodeDest = (Destination) node.getRegionDestination();
                    if (nodeDest != null) {
                        if (node != QueueMessageReference.NULL_MESSAGE) {
                            nodeDest.getDestinationStatistics().getDispatched().increment();
                            nodeDest.getDestinationStatistics().getInflight().increment();
                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
                        }
                    }
                }
            });
            context.getConnection().dispatchAsync(md);
        } else {
            context.getConnection().dispatchSync(md);
            onDispatch(node, message);
        }
        return true;
    }

    异步和同步分别对应 TransportConnection 类的2个方法:dispatchAsync,dispatchSync

    先分析同步代码:

    public void dispatchSync(Command message) {
        try {
            processDispatch(message);
        } catch (IOException e) {
            serviceExceptionAsync(e);
        }
    }

    很干脆,直接调用 processDispatch 方法。

    再分析异步发送:

    public void dispatchAsync(Command message) {
        if (!stopping.get()) {
            if (taskRunner == null) {
                dispatchSync(message);
            } else {
                synchronized (dispatchQueue) {
                    dispatchQueue.add(message);
                }
                try {
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } else {
            if (message.isMessageDispatch()) {
                MessageDispatch md = (MessageDispatch) message;
                TransmitCallback sub = md.getTransmitCallback();
                broker.postProcessDispatch(md);
                if (sub != null) {
                    sub.onFailure();
                }
            }
        }
    }

    先把消息加入到dispatchQueue中,然后唤醒taskRunner。

    taskRunner线程的调用栈如下:

    public boolean iterate() {
        try {
            if (pendingStop || stopping.get()) {
                if (dispatchStopped.compareAndSet(false, true)) {
                    if (transportException.get() == null) {
                        try {
                            dispatch(new ShutdownInfo());
                        } catch (Throwable ignore) {
                        }
                    }
                    dispatchStoppedLatch.countDown();
                }
                return false;
            }
            if (!dispatchStopped.get()) {
                Command command = null;
                synchronized (dispatchQueue) {
                    if (dispatchQueue.isEmpty()) {
                        return false;
                    }
                    command = dispatchQueue.remove(0);
                }
                processDispatch(command);
                return true;
            }
            return false;
        } catch (IOException e) {
            if (dispatchStopped.compareAndSet(false, true)) {
                dispatchStoppedLatch.countDown();
            }
            serviceExceptionAsync(e);
            return false;
        }
    }

    最终调用的也是 processDispatch 方法。

  • 相关阅读:
    BZOJ 3668: [Noi2014]起床困难综合症【贪心】
    浅谈错排公式的推导及应用
    Python爬虫笔记(一):爬虫基本入门
    机器理解大数据秘密:聚类算法深度剖析
    想了解概率图模型?你要先理解图论的基本定义与形式
    MATLAB命令大全+注释小结
    【批处理学习笔记】第二十九课:ASCII码
    【批处理学习笔记】第二十八课:声音和控制
    【批处理学习笔记】第二十七课:视窗
    【批处理学习笔记】第二十六课:返回值
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8650689.html
Copyright © 2011-2022 走看看