zoukankan      html  css  js  c++  java
  • ActiveMQ broker 集群, 静态发现和动态发现

    下载 activemq 压缩包解压后,conf 目录下有各种示例配置文件,红线标出的是静态发现和动态发现的配置。

    1. 静态配置

    启动3个 broker,端口分别为61616,61618,61620,配置如下:

    <networkConnectors></networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61616"/>
    </transportConnectors>
    <networkConnectors>
        <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
    </networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61618"/>
    </transportConnectors>
    <networkConnectors>
        <networkConnector uri="static:(tcp://localhost:61616,tcp://localhost:61618)" duplex="true"/>
    </networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61620"/>
    </transportConnectors>

    3个 broker 组成了一张网,当 producer 发送消息给 broker:61616 后,broker:61618 的消费者可以收到该消息。消息从 broker:61616 流动到 broker:61618,底层原理是 broker:61618 是 broker:61616 的一个消费者。

    2. 动态配置

    同样地, 启动3个 broker,端口分别为61616,61618,61620,配置如下:

    <networkConnectors>
        <networkConnector uri="multicast://default"/>
    </networkConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" />
    </transportConnectors>
    <networkConnectors>
        <networkConnector uri="multicast://default"/>
    </networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />
    </transportConnectors>
    <networkConnectors>
        <networkConnector uri="multicast://default"/>
    </networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61620" discoveryUri="multicast://default" />
    </transportConnectors>

    使用多播协议,把 3 个 broker 动态地组成了一张网。

    // 省略其他代码
    public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
    
        public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
        public static final String DEFAULT_HOST_STR = "default"; 
        public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
        public static final int    DEFAULT_PORT  = 6155; 
            
        private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
        private static final String TYPE_SUFFIX = "ActiveMQ-4.";
        private static final String ALIVE = "alive.";
        private static final String DEAD = "dead.";
        private static final String DELIMITER = "%";
        private static final int BUFF_SIZE = 8192;
        private static final int DEFAULT_IDLE_TIME = 500;
        private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
        
        public void run() {
            byte[] buf = new byte[BUFF_SIZE];
            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
            while (started.get()) {
                // 发送多播数据
                doTimeKeepingServices();
                try {
                    // 接收多播数据
                    mcast.receive(packet);
                    if (packet.getLength() > 0) {
                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
                        processData(str);
                    }
                } catch (SocketTimeoutException se) {
                    // ignore
                } catch (IOException e) {
                    if (started.get()) {
                        LOG.error("failed to process packet: " + e);
                    }
                }
            }
        }
    }

    3. broker 集群的原理

    如上图,61616和61618组成集群,当61618加入一个consumer时,61618向61616发送一条ConsumerInfo消息,这样61618就成为了61616的consumer。

    ConsumerInfo 示例:

    ConsumerInfo {
        commandId = 4, 
        responseRequired = false, 
        consumerId = dynamic-broker1->dynamic-broker2-1872-1524494145961-2:1:1:1, 
        destination = queue://TEST.BAT, 
        prefetchSize = 1, 
        maximumPendingMessageLimit = 0, 
        browser = false, 
        dispatchAsync = true, 
        selector = null, 
        clientId = ID:USER-20140617MT-1882-1524494166015-0:1, 
        subscriptionName = null, 
        noLocal = false, 
        exclusive = true, 
        retroactive = false, 
        priority = -5, 
        brokerPath = [ID:USER-20140617MT-1877-1524494148767-0:1], 
        optimizedAcknowledge = false, 
        noRangeAcks = false, 
        additionalPredicate = org.apache.activemq.command.NetworkBridgeFilter@413249b
    }
  • 相关阅读:
    Navicat
    Eclipse 代码质量管理插件
    oracle sql 逻辑处理
    view视图 | 索引
    LIKE模糊查询
    启动tomcat报找不到或无法加载主类
    oracle:decode
    oracle:case when then else end
    ssh 公共秘钥
    ip 和数字之间的转换
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8872822.html
Copyright © 2011-2022 走看看