1. 启动多个Broker
在win10下同一台服务器启动多个Broker,
步骤如下:
1. 复制安装目录下的conf文件夹命名为conf2
2. 修改activemq.xml中的brokerName不能跟之前的一样。如:
3.修改数据存放的名称,如下:
4. 修改所有的transportConnectors端口,都要跟之前的不一样
修改jetty.xml下的
5. 复制bin目录重命名为bin2
修改activemq中的
ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
改为
ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"
修改程序的pid,不能跟前面的重复
修改所有activemq.bat中的/conf替换成/conf2
bin2/win64下的ctivemq.bat中也一样,修改bin2/win64wrapper.conf为
set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf2
分别启动两个不同的broker,结果如下
2. ActiveMQ静态网络连接
2.1 ActiveMQ的networkConnector是什么?
在某些场景下,需要多个ActiveMQ的Broker做集群,那么就涉及到了Broker到Broker的通信,这个被称为ActiveMQ的networkConnector。
ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息。这就是所谓的“桥接”.ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下:
2.2 “Discovery"的概念
一般情况下,discovery是被用来发现远程服务的,客户端通常想去发现所有可利用的brokers;另一层意思是,它是基于现有的网络Broker去发现其他可用的Brokers。
有两种配置Client到Broker的连接方式,一种方式是:Client通过Statically配置的方式去连接Broker,另一种方式是:Client通过discovery agents来dynamically的发现Brokers
2.3 Static networks
Static networkConnector是用于创建一个静态配置对于网络中的多个Broker,这种协议用于复合url,一个复合url包括多个url地址,格式如下:
static://(uri1,uri2,uri3....)?key=value
1. 配置实例如下
<networkConnectors> <networkConnector name="local network" uri="static://(tcp://localhost:61616)"/> </networkConnectors
2. Static networks基本原理示意图
测试:使用生产者从端口为61716端口发送消息,使用消费者从端口61616接收消息,可能看到能够成功的接收消息。但是从使用生产者从端口为61616端口发送消息,使用消费者从端口61716接收消息,是不能成功的接收消息的,这就说明了static network默认是单向的。
那么加入BrokerA中有一个消费者,BrokerB有一个消费者,当生产者想BrokerB中发送消息的时候,是哪个消费者先收到呢?
这里做了一个测试,将static network,设置成双向连接的,然后使用两个不同的消费者分别连接BrokerA和BrokerB,当想BrokerB中发送消息时,使用多线程同时启动两个不同的消费者,观察到的现象是消息几乎被连接BrokerA的消费者消费完了。再发送到BrokerA,也同时启动两个不同的消费者,发现消息几乎被连接BrokerB的消费者消费了,所以在默认不采取负载均衡的情况下,网络中的消费者消费消息的能力是优于本机消费者的。
2.4 networkConnector配置的可用属性
1. name: 默认是bridge
2. dynamicOnly: 默认是false,如果为true,持久订阅被激活时才会创建对应的网络持久订阅,默认是启动时激活
3. decreaseNetworkConsumerPriority: 默认是false,设定消费者优先权,如果为true,网络的消费者优先降低为-5,如果为false,则默认跟本地消费者一样为0.
4. networkTTL:默认是1,网络中用于消息和订阅消费者的broker数量。
5. messageTTL:默认是1,网络中用于消息的broker数量。
6. consumerTTL:默认是1,网络中用于消费的broker数量。
7. dynamicallyIncludeDestinaitons: 默认为空,要包括的动态消息地址,类似于excludedDestinations.如:
<dynamicallyIncludeDestinaitons> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </dynamicallyIncludeDestinaitons>
8. staticallyIncludedDestinations: 默认为空,要包括的静态消息地址,类似于excludedDestations.如:
<staticallyIncludeDestinaitons> <queue physicalName="aways.include.queue"/> </staticallyIncludeDestinaitons>
9. excludeDestinations: 默认为空,指定排除的地址,示例如下:
<excludedDestinaitons> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </excludedDestinaitons>
7.8.9的完整配置如下:
<networkConnectors> <networkConnector name="local network" uri="static://(tcp://localhost:61616,tcp://localhost:61716)"/> <dynamicallyIncludeDestinaitons> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </dynamicallyIncludeDestinaitons> <staticallyIncludeDestinaitons> <queue physicalName="aways.include.queue"/> </staticallyIncludeDestinaitons> <excludedDestinaitons> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </excludedDestinaitons> </networkConnectors>
10. duplex:默认false,设置是否能双向通信
配置如下:
<networkConnectors> <networkConnector duplex="true" name="local network" uri="static://(tcp://localhost:61616,tcp://localhost:61716)"/> </networkConnectors>
配置之后即可从61616端口生产消息之后,61716端口能够接收到消息。
11. prefetchSize: 默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息。’
12. suppressDuplicateQueueSubscriptions: 默认false,如果为true,重复的订阅关系一生产即被组织。
13. bridgeTempDestinations: 默认true,是否广播advisory message来创建临时destination
14. alwaysSyncSend: 默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker
15. staticBridge: 默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。
16.conduitSubscriptions: 默认true,是否把同一个broker的多个consumer当做一个来处理(在做集群的时候如果有多个consumer,需要设置为false)
3. ”丢失“的消息--消息回流
有这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,一些消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers,不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是一部分他们还没有消费的消息被broker2已经分发到broker1上去了,这些消息就好像消失了,除非有消费者重新连接到broker1上来消费,这怎么办呢?
从5.6版开始,在destinationPolicy上新增选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有被消费时,把消息回流到它原始的broker.同时把enableAudit设置为false,为了防止消息回流后被当作重复消息而不被分发,示例如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>