zoukankan      html  css  js  c++  java
  • ActiveMQ topic 普通订阅和持久订阅

    直观的结果:当生产者向 topic 发送消息,

    1. 若不存在持久订阅者和在线的普通订阅者,这个消息不会保存,当普通订阅者上线后,它是收不到消息的。

    2. 若存在离线的持久订阅者,broker 会为该持久订阅者保存消息,当该持久订阅者上线后,会收到消息。

    本质:producer 发送消息给 topic,broker 接收消息并且分发给 consumers。consumers 包括持久订阅者和在线的普通订阅者,对于持久订阅者,broker 把消息添加到它的 message cursor 中;对于普通订阅者,broker 直接分发消息。

    如果,1个 topic 有2个持久订阅者,并且这2个持久订阅者都不在线,这时 producer 向 topic 发送1条消息,则 broker 会保存2条消息。因此,如果1个 topic 有很多不在线的持久订阅者,会导致 broker 消耗过多存储。

    对于持久化的消息,这很好验证:为方便查看消息,将 broker 持久化方式配置为jdbc,则可以在 ACTIVEMQ_MSGS 表中看到持久化消息。

    持久订阅者1:

    new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10086");
    ...
    TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

    持久订阅者2:

    new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10087");
    ...
    TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

    对于持久消息,验证 broker 为每个持久订阅者保存1条消息:
    1. 启动 broker;
    2. 分别启动2个持久订阅者,然后关闭它们,这样 broker 有了2个离线的持久订阅者;
    3. 启动 producer 向 topic 发送1条消息;
    4. 查看 ACTIVEMQ_MSGS 表

    对于非持久消息,直接跟代码了,这里不说明。

    下图是 Topic 接收消息并分发的调用栈:

    // org.apache.activemq.broker.region.policy.SimpleDispatchPolicy
    // 分发策略很简单,就是遍历consumers
    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
            throws Exception {
        int count = 0;
        for (Subscription sub : consumers) {
            // Don't deliver to browsers
            if (sub.getConsumerInfo().isBrowser()) {
                continue;
            }
            // Only dispatch to interested subscriptions
            if (!sub.matches(node, msgContext)) {
                sub.unmatched(node);
                continue;
            }
    
            //持久化订阅是 DurableTopicSubscription
            //普通订阅是 TopicSubscription
            sub.add(node);
            count++;
        }
    
        return count > 0;
    }

    持久化订阅:

    // org.apache.activemq.broker.region.DurableTopicSubscription
    public void add(MessageReference node) throws Exception {
        if (!active.get() && !keepDurableSubsActive) {
            return;
        }
        // 调用 PrefetchSubscription.add
        super.add(node);
    }
    
    //  org.apache.activemq.broker.region.PrefetchSubscription
    public void add(MessageReference node) throws Exception {
        synchronized (pendingLock) {
            // The destination may have just been removed...
            if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
                // perhaps we should inform the caller that we are no longer valid to dispatch to?
                return;
            }
    
            // Don't increment for the pullTimeout control message.
            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
                enqueueCounter++;
            }
            //首先加入到 message cursor 中,pending 类型为 StoreDurableSubscriberCursor
            pending.addMessageLast(node);
        }
        dispatchPending();
    }

    持久化订阅者上线后,也会触发消息分发动作即 dispatchPending,调用栈如下图:

    持久化订阅者使用的 message cursor 是 StoreDurableSubscriberCursor。

    普通订阅:

    org.apache.activemq.broker.region.TopicSubscription.add(MessageReference node)

  • 相关阅读:
    start tag, end tag issues in IE7, particularly in xslt transformation
    用SandCastle为注释生成chm文档
    Firebug
    架构的重点
    Linux Shell常用技巧(十) 管道组合
    Linux JDK升级
    Linux Shell常用技巧(十二) Shell编程
    Packet Tracer 5.0实验(一) 交换机的基本配置与管理
    Linux Shell常用技巧(六) sort uniq tar split
    Linux Shell常用技巧(二) grep
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8807641.html
Copyright © 2011-2022 走看看