下载 activemq 压缩包解压后,conf 目录下有各种示例配置文件,红线标出的是静态发现和动态发现的配置。
1. 静态配置
启动3个 broker,端口分别为61616,61618,61620,配置如下:
<networkConnectors></networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> </transportConnectors>
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61618"/> </transportConnectors>
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616,tcp://localhost:61618)" duplex="true"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61620"/> </transportConnectors>
3个 broker 组成了一张网,当 producer 发送消息给 broker:61616 后,broker:61618 的消费者可以收到该消息。消息从 broker:61616 流动到 broker:61618,底层原理是 broker:61618 是 broker:61616 的一个消费者。
2. 动态配置
同样地, 启动3个 broker,端口分别为61616,61618,61620,配置如下:
<networkConnectors> <networkConnector uri="multicast://default"/> </networkConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" /> </transportConnectors>
<networkConnectors> <networkConnector uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" /> </transportConnectors>
<networkConnectors> <networkConnector uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61620" discoveryUri="multicast://default" /> </transportConnectors>
使用多播协议,把 3 个 broker 动态地组成了一张网。
// 省略其他代码 public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; public static final String DEFAULT_HOST_STR = "default"; public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); public static final int DEFAULT_PORT = 6155; private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class); private static final String TYPE_SUFFIX = "ActiveMQ-4."; private static final String ALIVE = "alive."; private static final String DEAD = "dead."; private static final String DELIMITER = "%"; private static final int BUFF_SIZE = 8192; private static final int DEFAULT_IDLE_TIME = 500; private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10; public void run() { byte[] buf = new byte[BUFF_SIZE]; DatagramPacket packet = new DatagramPacket(buf, 0, buf.length); while (started.get()) { // 发送多播数据 doTimeKeepingServices(); try { // 接收多播数据 mcast.receive(packet); if (packet.getLength() > 0) { String str = new String(packet.getData(), packet.getOffset(), packet.getLength()); processData(str); } } catch (SocketTimeoutException se) { // ignore } catch (IOException e) { if (started.get()) { LOG.error("failed to process packet: " + e); } } } } }
3. broker 集群的原理
如上图,61616和61618组成集群,当61618加入一个consumer时,61618向61616发送一条ConsumerInfo消息,这样61618就成为了61616的consumer。
ConsumerInfo 示例:
ConsumerInfo { commandId = 4, responseRequired = false, consumerId = dynamic-broker1->dynamic-broker2-1872-1524494145961-2:1:1:1, destination = queue://TEST.BAT, prefetchSize = 1, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = ID:USER-20140617MT-1882-1524494166015-0:1, subscriptionName = null, noLocal = false, exclusive = true, retroactive = false, priority = -5, brokerPath = [ID:USER-20140617MT-1877-1524494148767-0:1], optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = org.apache.activemq.command.NetworkBridgeFilter@413249b }