zoukankan      html  css  js  c++  java
  • ActiveMQ topic 普通订阅和持久订阅

    直观的结果:当生产者向 topic 发送消息,

    1. 若不存在持久订阅者和在线的普通订阅者,这个消息不会保存,当普通订阅者上线后,它是收不到消息的。

    2. 若存在离线的持久订阅者,broker 会为该持久订阅者保存消息,当该持久订阅者上线后,会收到消息。

    本质:producer 发送消息给 topic,broker 接收消息并且分发给 consumers。consumers 包括持久订阅者和在线的普通订阅者,对于持久订阅者,broker 把消息添加到它的 message cursor 中;对于普通订阅者,broker 直接分发消息。

    如果,1个 topic 有2个持久订阅者,并且这2个持久订阅者都不在线,这时 producer 向 topic 发送1条消息,则 broker 会保存2条消息。因此,如果1个 topic 有很多不在线的持久订阅者,会导致 broker 消耗过多存储。

    对于持久化的消息,这很好验证:为方便查看消息,将 broker 持久化方式配置为jdbc,则可以在 ACTIVEMQ_MSGS 表中看到持久化消息。

    持久订阅者1:

    new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10086");
    ...
    TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

    持久订阅者2:

    new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10087");
    ...
    TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

    对于持久消息,验证 broker 为每个持久订阅者保存1条消息:
    1. 启动 broker;
    2. 分别启动2个持久订阅者,然后关闭它们,这样 broker 有了2个离线的持久订阅者;
    3. 启动 producer 向 topic 发送1条消息;
    4. 查看 ACTIVEMQ_MSGS 表

    对于非持久消息,直接跟代码了,这里不说明。

    下图是 Topic 接收消息并分发的调用栈:

    // org.apache.activemq.broker.region.policy.SimpleDispatchPolicy
    // 分发策略很简单,就是遍历consumers
    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
            throws Exception {
        int count = 0;
        for (Subscription sub : consumers) {
            // Don't deliver to browsers
            if (sub.getConsumerInfo().isBrowser()) {
                continue;
            }
            // Only dispatch to interested subscriptions
            if (!sub.matches(node, msgContext)) {
                sub.unmatched(node);
                continue;
            }
    
            //持久化订阅是 DurableTopicSubscription
            //普通订阅是 TopicSubscription
            sub.add(node);
            count++;
        }
    
        return count > 0;
    }

    持久化订阅:

    // org.apache.activemq.broker.region.DurableTopicSubscription
    public void add(MessageReference node) throws Exception {
        if (!active.get() && !keepDurableSubsActive) {
            return;
        }
        // 调用 PrefetchSubscription.add
        super.add(node);
    }
    
    //  org.apache.activemq.broker.region.PrefetchSubscription
    public void add(MessageReference node) throws Exception {
        synchronized (pendingLock) {
            // The destination may have just been removed...
            if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
                // perhaps we should inform the caller that we are no longer valid to dispatch to?
                return;
            }
    
            // Don't increment for the pullTimeout control message.
            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
                enqueueCounter++;
            }
            //首先加入到 message cursor 中,pending 类型为 StoreDurableSubscriberCursor
            pending.addMessageLast(node);
        }
        dispatchPending();
    }

    持久化订阅者上线后,也会触发消息分发动作即 dispatchPending,调用栈如下图:

    持久化订阅者使用的 message cursor 是 StoreDurableSubscriberCursor。

    普通订阅:

    org.apache.activemq.broker.region.TopicSubscription.add(MessageReference node)

  • 相关阅读:
    服务器时间不准导致 com.sun.facelets.impl.DefaultFacelet refresh
    推荐10款来自极客标签的超棒前端特效[第五期] java程序员
    IE10的市场占有率扩充了一倍 java程序员
    固定背景实现的背景滚动特效 java程序员
    支持触摸设备的响应式HTML5音频播放器 AudioPlayer.js java程序员
    WebRTC与Ace在线代码编辑器合作,实现实时协作编程 java程序员
    最流行的JavaScript库,jQuery不再支持IE旧版本 java程序员
    Jquery实现鼠标移上弹出提示框,移出消失 java程序员
    xxx.c: Error: C3065E: type of input file 'xxxx' unknown java程序员
    35+多用途WordPress主题 java程序员
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8807641.html
Copyright © 2011-2022 走看看