zoukankan      html  css  js  c++  java
  • ActiveMQ学习笔记(9)----ActiveMQ静态网络连接

    1. 启动多个Broker

      在win10下同一台服务器启动多个Broker,

      步骤如下:

      1. 复制安装目录下的conf文件夹命名为conf2

      2. 修改activemq.xml中的brokerName不能跟之前的一样。如:

     

      3.修改数据存放的名称,如下:

      

      4. 修改所有的transportConnectors端口,都要跟之前的不一样

      修改jetty.xml下的

      

      

      5. 复制bin目录重命名为bin2

      修改activemq中的

    ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
    改为
    ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"

      修改程序的pid,不能跟前面的重复

      

      修改所有activemq.bat中的/conf替换成/conf2

      bin2/win64下的ctivemq.bat中也一样,修改bin2/win64wrapper.conf为

      set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf2

    分别启动两个不同的broker,结果如下

      

      

    2. ActiveMQ静态网络连接

      2.1 ActiveMQ的networkConnector是什么?

        在某些场景下,需要多个ActiveMQ的Broker做集群,那么就涉及到了Broker到Broker的通信,这个被称为ActiveMQ的networkConnector。

        ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息。这就是所谓的“桥接”.ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下:

      

      2.2 “Discovery"的概念

      一般情况下,discovery是被用来发现远程服务的,客户端通常想去发现所有可利用的brokers;另一层意思是,它是基于现有的网络Broker去发现其他可用的Brokers。

      有两种配置Client到Broker的连接方式,一种方式是:Client通过Statically配置的方式去连接Broker,另一种方式是:Client通过discovery agents来dynamically的发现Brokers

      2.3 Static networks

      Static networkConnector是用于创建一个静态配置对于网络中的多个Broker,这种协议用于复合url,一个复合url包括多个url地址,格式如下:

      static://(uri1,uri2,uri3....)?key=value

      1. 配置实例如下 

    <networkConnectors>
        <networkConnector name="local network" uri="static://(tcp://localhost:61616)"/>
    </networkConnectors

      2. Static networks基本原理示意图

      

      测试:使用生产者从端口为61716端口发送消息,使用消费者从端口61616接收消息,可能看到能够成功的接收消息。但是从使用生产者从端口为61616端口发送消息,使用消费者从端口61716接收消息,是不能成功的接收消息的,这就说明了static network默认是单向的。

      那么加入BrokerA中有一个消费者,BrokerB有一个消费者,当生产者想BrokerB中发送消息的时候,是哪个消费者先收到呢?

      这里做了一个测试,将static network,设置成双向连接的,然后使用两个不同的消费者分别连接BrokerA和BrokerB,当想BrokerB中发送消息时,使用多线程同时启动两个不同的消费者,观察到的现象是消息几乎被连接BrokerA的消费者消费完了。再发送到BrokerA,也同时启动两个不同的消费者,发现消息几乎被连接BrokerB的消费者消费了,所以在默认不采取负载均衡的情况下,网络中的消费者消费消息的能力是优于本机消费者的。

      2.4 networkConnector配置的可用属性

        1. name: 默认是bridge

        2. dynamicOnly: 默认是false,如果为true,持久订阅被激活时才会创建对应的网络持久订阅,默认是启动时激活

        3. decreaseNetworkConsumerPriority: 默认是false,设定消费者优先权,如果为true,网络的消费者优先降低为-5,如果为false,则默认跟本地消费者一样为0.

        4. networkTTL:默认是1,网络中用于消息和订阅消费者的broker数量。

        5.  messageTTL:默认是1,网络中用于消息的broker数量。

        6. consumerTTL:默认是1,网络中用于消费的broker数量。

        7. dynamicallyIncludeDestinaitons: 默认为空,要包括的动态消息地址,类似于excludedDestinations.如:

     <dynamicallyIncludeDestinaitons>
            <queue physicalName="include.test.foo"/>
            <topic physicalName="include.test.bar"/>
     </dynamicallyIncludeDestinaitons>

        8. staticallyIncludedDestinations: 默认为空,要包括的静态消息地址,类似于excludedDestations.如:

     <staticallyIncludeDestinaitons>
            <queue physicalName="aways.include.queue"/>
     </staticallyIncludeDestinaitons>

        9. excludeDestinations: 默认为空,指定排除的地址,示例如下:

    <excludedDestinaitons>
            <queue physicalName="include.test.foo"/>
            <topic physicalName="include.test.bar"/>
     </excludedDestinaitons>

      7.8.9的完整配置如下: 

     
    <networkConnectors>
                <networkConnector name="local network" uri="static://(tcp://localhost:61616,tcp://localhost:61716)"/>
                <dynamicallyIncludeDestinaitons>
                    <queue physicalName="include.test.foo"/>
                    <topic physicalName="include.test.bar"/>
                 </dynamicallyIncludeDestinaitons>
                <staticallyIncludeDestinaitons>
                    <queue physicalName="aways.include.queue"/>
                 </staticallyIncludeDestinaitons>
                <excludedDestinaitons>
                    <queue physicalName="include.test.foo"/>
                    <topic physicalName="include.test.bar"/>
                 </excludedDestinaitons>
            </networkConnectors>
     

      10. duplex:默认false,设置是否能双向通信

        配置如下:

    <networkConnectors>
                <networkConnector duplex="true" name="local network" uri="static://(tcp://localhost:61616,tcp://localhost:61716)"/>
            </networkConnectors>

      配置之后即可从61616端口生产消息之后,61716端口能够接收到消息。

      11. prefetchSize: 默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息。’

      12. suppressDuplicateQueueSubscriptions: 默认false,如果为true,重复的订阅关系一生产即被组织。

      13. bridgeTempDestinations: 默认true,是否广播advisory message来创建临时destination

      14. alwaysSyncSend: 默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker

      15. staticBridge: 默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。

      16.conduitSubscriptions: 默认true,是否把同一个broker的多个consumer当做一个来处理(在做集群的时候如果有多个consumer,需要设置为false)

    3. ”丢失“的消息--消息回流

      有这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,一些消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers,不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是一部分他们还没有消费的消息被broker2已经分发到broker1上去了,这些消息就好像消失了,除非有消费者重新连接到broker1上来消费,这怎么办呢?

      从5.6版开始,在destinationPolicy上新增选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有被消费时,把消息回流到它原始的broker.同时把enableAudit设置为false,为了防止消息回流后被当作重复消息而不被分发,示例如下:

     
         <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue=">" enableAudit="false">
                        <networkBridgeFilterFactory>
                            <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                        </networkBridgeFilterFactory>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
     

    原文 ActiveMQ学习笔记(9)----ActiveMQ静态网络连接

  • 相关阅读:
    Python—设计模式
    Python—操作系统和多线程
    thin mission 2021 11 3
    搜索
    c++ 调试
    Lecture--words families
    高数--积分
    thin mission 2021.11.2
    tiny mission 2021.11.1
    zlib使用心得
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10854693.html
Copyright © 2011-2022 走看看