zoukankan      html  css  js  c++  java
  • ActiveMQ静态网络链接(broker-to-broker)

      ActiveMQ的网络连接分为静态连接和动态连接。本章研究静态连接。

    1.ActiveMQ的networkConnector是什么 

      在某些情况下,需要多个ActiveMQ的Broker做集群,那么就涉及到Broker到Broker的通信,这个就称为ActiveMQ的networkConnector。也就是broker-to-broker

      ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息,这就是所谓的"桥接"

      ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker不仅发送消息而且也能从相同的通道接收消息,通常作为duplex connector来映射。

    2.有两种配置Broker到Broker的链接方式

      第一种: Broker通过Staticlly配置的方式去连接Broker(静态链接)

      第二种:    Broker通过discover agent来dynamically的发现Brokers(动态链接)===后面学习

    3.Static networks:

      (在前面集群环境的搭建过程实际就是静态连接:https://www.cnblogs.com/qlqwjy/p/9728425.html)

      Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker,这种协议用于复合url,一个复合url包括多个url地址,格式如下: 

    static:(uri1,uri2,uri3, ...)?key=value

     3.1搭建两个broker

       关于创建多个broker的方式参考:https://www.cnblogs.com/qlqwjy/p/10463660.html

      这里创建两个broker,端口分别是:tcp://0.0.0.0:61616和tcp://0.0.0.0:61617,对应的jetty的http端口是8161和8162。

    现在在61616的那个activemq.xml中增加静态连接:

            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <networkConnectors>
                <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)"></networkConnector>
            </networkConnectors>

      接下来正常启动两个broker,并且从http验证正常启动。

     3.2测试:向61616发送消息,从61617接收消息

    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 生产消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:04:41
     */
    public class MsgProducer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
        private static final String queueName = "myQueue";
        private static Session session = null;
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 3.启动connection
            connection.start();
            // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 5.创建Destination(Queue继承Queue)
            Queue destination = session.createQueue(queueName);
            // 6.创建生产者producer
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < 10; i++) {
                // 7.创建Message,有好多类型,这里用最简单的TextMessage
                TextMessage tms = session.createTextMessage("textMessage:" + i);
    
                // 设置附加属性
                tms.setStringProperty("str", "stringProperties" + i);
    
                // 8.生产者发送消息
                producer.send(tms);
            }
    
            // 9.提交事务
            session.commit();
    
            // 10.关闭connection
            session.close();
            connection.close();
        }
    
    }
    package cn.qlq.activemq;
    
    import java.util.Enumeration;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消费消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:26:41
     */
    public class MsgConsumer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61617/";
        private static final String queueName = "myQueue";
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();
            while (jmsxPropertyNames.hasMoreElements()) {
                String nextElement = (String) jmsxPropertyNames.nextElement();
                System.out.println("JMSX name ===" + nextElement);
            }
            // 3.启动connection
            connection.start();
            // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            // 5.创建Destination(Queue继承Queue)
            Queue destination = session.createQueue(queueName);
            // 6.创建消费者consumer
            MessageConsumer consumer = session.createConsumer(destination);
    
            int i = 0;
            while (i < 5) {
                TextMessage textMessage = (TextMessage) consumer.receive();
                System.out.println("接收消息:" + textMessage.getText() + ";属性" + textMessage.getStringProperty("str"));
                i++;
    
                if (i == 5) {// 确保消费完所有的消息再进行确认
                    textMessage.acknowledge();
                }
            }
    
            // 提交事务,进行确认收到消息
            session.commit();
    
            session.close();
            connection.close();
        }
    }

    结果:

     总结:

      可以看做61617是61616的一个消费者,也就是61616上的消息也可以从61617进行消费,但是61617上生产的消息不能从61616上消费。因为只在61616配置了静态连接,61616可以通过network连接到61616和61617,也就是61617可以作为61616的消费者。

      如果想实现在双方都可以生产并消费在61617的activemq.xml配置相同的静态连接就可以了。或者参考下面的第11条配置设置为双向通信。

      如果想实现双方的消息同步,那么实现数据共享就可以了,也就是kahaDB的目录共享就可以实现共享数据。

    3.3  networkConnector配置的可用属性

     1.name: 默认为bridge

    2.dynamicOnly: 默认是false,如果为true,持久订阅被激活时才创建对应的网络持久订阅。默认是启动时激活

    3.decreaseNetworkConsumerPriority: 默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0。(其实这个进一步的理解就是消费者同时从61617和                 61616消费消息,通常情况下61617的先获取到消息,也就是静态连接的另一方发而可以获得更多的消息)

    4.networkTTL: 默认是1,网络中用于消息和订阅消费的broker数量

    5.messageTTL: 默认是1,网络中用于消息的broker数量

    6.consumerTTL: 默认是1,网络中用于消费的broker数量
    7.conduitSubscriptions: 默认true,是否把同一个broker的多个consumer当做一个来处理(在做集群的时候如果有多个consumer,需要设置为false)

    8.dynamicallyIncludedDestinations:默认为空,要包括的动态消息地址,类适于excludedDestinations,如:类似于一个消息的过滤,只向61617传输满足条件的消息===这个用在动态连接

            <dynamicallyIncludedDestinations>
             <queue physicalName="include.test.foo"/>
             <topic physicalName="include.test.bar"/>
            </dynamicallyIncludedDestinations> 

    9.staticallyIncludedDestinations:默认为空,要包括的静态消息地址。类似于excludedDestinations,如:类似于一个消息的过滤,只向61617传输满足条件的消息===这个用在静态连接

    <staticallyIncludedDestinations>
      <queue physicalName="always.include.queue"/>
    </staticallyIncludedDestinations>

    10.excludedDestinations: 默认为空,指定排除的地址,示例如下:(这个是排除的消息)

    <networkConnectors>
      <networkConnector uri="static://(tcp://localhost:61617)" name="bridge" dynamicOnly="false" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false">
      <excludedDestinations>
        <queue physicalName="exclude.test.foo">
        <topic physicalName="exclude.test.bar">
      </excludedDestinations>
      <dynamicallyIncludedDestinations>
        <queue physicalName="include.test.foo"/>
        <topic physicalName="include.test.bar"/>
      </dynamicallyIncludedDestinations>
      <staticallyIncludedDestinations>
        <queue physicalName="always.include.queue"/>
      </staticallyIncludedDestinations>
      </networkConnector>
    </networkConnectors>

     11.duplex:默认是false,设置是否能双向通信。也就是双方都可以作为对方的消费者。

            <networkConnectors>
                <networkConnector duplex="true" name="local_network" uri="static:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)"></networkConnector>
            </networkConnectors>

    12. prefetchSize: 默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息

    13. suppressDuplicateQueueSubscriptions: 默认false,如果为true,重复的订阅关系一产生即被阻止

    14. bridgeTempDestinations: 默认true,是否广播advisory messages来创建临时的destination
    15. alwaysSyncSend: 默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker
    16. staticBridge: 默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理,这个参数配置是否是静态桥

    4.多线程consumer访问集群

      为了理解上面配置的第3条:decreaseNetworkConsumerPriority。我们开启多线程进行访问。也就是多个线程同时访问61616和61617,然后单线程向61616传送消息,查看从61616消费的消息多还是从61617消费的消息多。

    消费者:

    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer61616 {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
    
        public static void main(String[] args) {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            for (int i = 0; i < 30; i++) {
                new Thread2(connectionFactory, "myQueue").start();
            }
        }
    }
    
    class Thread2 extends Thread {
    
        private ConnectionFactory connectionFactory;
        private String queueName;
    
        public Thread2(ConnectionFactory connectionFactory, String queueName) {
            super();
            this.connectionFactory = connectionFactory;
            this.queueName = queueName;
        }
    
        @Override
        public void run() {
            try {
                // 2.由connectionFactory创建connection
                Connection connection = connectionFactory.createConnection();
                // 3.启动connection
                connection.start();
                // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 5.创建Destination(Queue继承Queue)
                Queue destination = session.createQueue(queueName);
                // 6.创建消费者consumer
                MessageConsumer consumer = session.createConsumer(destination);
                // 7.receive是一个阻塞方法
                Message message = consumer.receive();
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("接收消息(61616):" + txtMsg.getText());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer61617 {
    
        // 默认端口61617
        private static final String url = "tcp://localhost:61617/";
    
        public static void main(String[] args) {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            for (int i = 0; i < 30; i++) {
                new Thread1(connectionFactory, "myQueue").start();
            }
        }
    }
    
    class Thread1 extends Thread {
    
        private ConnectionFactory connectionFactory;
        private String queueName;
    
        public Thread1(ConnectionFactory connectionFactory, String queueName) {
            super();
            this.connectionFactory = connectionFactory;
            this.queueName = queueName;
        }
    
        @Override
        public void run() {
            try {
                // 2.由connectionFactory创建connection
                Connection connection = connectionFactory.createConnection();
                // 3.启动connection
                connection.start();
                // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 5.创建Destination(Queue继承Queue)
                Queue destination = session.createQueue(queueName);
                // 6.创建消费者consumer
                MessageConsumer consumer = session.createConsumer(destination);
                // 7.receive是一个阻塞方法
                Message message = consumer.receive();
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("接收消息(61617):" + txtMsg.getText());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    生产者:

    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 生产消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:04:41
     */
    public class MsgProducer {
    
        // 默认端口61616
        private static final String url = "tcp://localhost:61616/";
        private static final String queueName = "myQueue";
        private static Session session = null;
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 3.启动connection
            connection.start();
            // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 5.创建Destination(Queue继承Queue)
            Queue destination = session.createQueue(queueName);
            // 6.创建生产者producer
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < 60; i++) {
                // 7.创建Message,有好多类型,这里用最简单的TextMessage
                TextMessage tms = session.createTextMessage("textMessage:" + i);
    
                // 设置附加属性
                tms.setStringProperty("str", "stringProperties" + i);
    
                // 8.生产者发送消息
                producer.send(tms);
            }
    
            // 9.提交事务
            session.commit();
    
            // 10.关闭connection
            session.close();
            connection.close();
        }
    
    }

    结果一:如果所有线程在等待的情况下,本机的速度快于static的对方。也就是61616的消费者获得的多。

      我们人为的认为是本机快于static静态网络连接的消费者;实际自己的测试结果也是这样。不知道是不是自己的多线程写的有问题。。。。。。这个也没有多大影响

    5.集群下的消息回流

      消息回流也就是允许broker1将从broker2拉取的消息回传给broker2。如果要允许broker2向broker1回传需要在broker2的配置文件做相同的配置。

    消息丢失 

      如果有broker1和broker2通过networkConnector连接,有一个consumer1连接到broker1,一个consumer2连接到broker2,程序往broker2上面发送10条消息,这时consumer1连接到broker1消费消息,当consumer1消费了5条消息时,broker1挂掉了。 但是还剩下5条消息在broker1上面,这些消息就好像消息了,除非broker1重启了,然后有消费者连接到broker1来消费消息,遇到这样的情况该怎么办呢?

      从5.6版本起,在destinationPolicy上新增的选择replayWhenNoConsumers,这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker2上,同时需要把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,activemq.xml配置如下:

      注意 下面 policyEntry  的配置。 下面是61616的配置,也就是61616允许回流到61617

            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue=">" enableAudit="false">
                        <networkBridgeFilterFactory>
                            <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                        </networkBridgeFilterFactory>
                    </policyEntry>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    ...
            <networkConnectors>
                <networkConnector duplex="true" name="local_network" uri="static:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)"></networkConnector>
            </networkConnectors>

    测试如下:

    1.向61617发送10条消息:

    61617对应的后台的信息汇总:

     61616对应的后台的信息汇总:

     

    2.从61616消费5条消息

      通过这个也可以看出,我们通过broker2去消费broker1的消息,broker2会一下把所有满足条件的消息都拉倒自己目录存储起来。对于broker1来说是所有消息被消费了,对于broker2来说是生产了从broker1获取的消息。

     61617对应的后台的信息汇总:

    61616对应的后台的信息汇总:

     3.从61617消费五条消息

      61617对应的后台的信息汇总:

     61616对应的后台的信息汇总:

    6.  ActiveMQ失效转移和容错性

    允许当其中一台消息服务器宕机时,客户端在传输层上重新连接其他消息服务器。

      语法:failover:(uri1,...uriN)?transportOptions

      transportOptions参数说明:  randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略

                    initialReconnectDelay默认为10,默认10毫秒,表示第一次尝试重连之间等待的时间。

                    maxReconnectDelay:默认30000,单位毫秒,最长重连时间的间隔

    Java程序中只需要改对于的URL即可:

    private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true";

       默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其他的Broker上。默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。当然所有的重新链接参数都可以根据应用的需要而配置。

  • 相关阅读:
    安装VMtools vim编辑器的使用 压缩包命令 Linux下的用户管理 (第三天)
    VM虚拟机安装 常用Linux命令 网卡配置 (第二天)
    数据库的交互模式 常用的dos命令 (第一天)
    Validate US Telephone Numbers FreeCodeCamp
    Arguments Optional FreeCodeCamp
    Everything Be True FreeCodeCamp
    Binary Agents FreeCodeCamp
    Steamroller FreeCodeCamp
    Drop it FreeCodeCamp
    Smallest Common Multiple FreeCodeCamp
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10486552.html
Copyright © 2011-2022 走看看