zoukankan      html  css  js  c++  java
  • 关于ActiveMQ的几种集群配置

    ActiveMQ的几种集群配置。

    Queue consumer clusters

    此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:

    Broker clusters

    此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。

    failover:(tcp://localhost:61616,tcp://localhost:61617)

    failover官网介绍 http://activemq.apache.org/failover-transport-reference.html

    broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:

    静态发现:

    静态发现通过配置固定的broker uri来发现彼此,配置语法如下:

    static:(uri1,uri2,uri3,...)?options

    例如:

    static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100
    

      

    更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html

    动态发现:

    动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:

     <broker name="foo">
       <transportConnectors>
         <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
       </transportConnectors>
       ...
     </broker>

     更多动态发现机制介绍,见官网 http://activemq.apache.org/discovery-transport-reference.html

    Networks of brokers

    多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。

    该方案主要有以下两种配置方式:

    1、为broker配置文件配置networkConnector元素

    2、使用发现机制互相探测broker

    Here is an example of using the fixed list of URIs:

    <?xml version="1.0" encoding="UTF-8"?>
     
    <beans xmlns="http://activemq.org/config/1.0">
     
      <broker brokerName="receiver" persistent="false" useJmx="false"> 
        <networkConnectors>
          <!-- Static discovery -->
          <networkConnector uri="static:(tcp://localhost:62001)"/>
          <!-- MasterSlave Discovery -->
          <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
        </networkConnectors>
     
        <persistenceAdapter>
          <memoryPersistenceAdapter/>
        </persistenceAdapter>
     
       <transportConnectors>
          <transportConnector uri="tcp://localhost:62002"/>
        </transportConnectors>
      </broker>
     
    </beans>
    

      

    This example uses multicast discovery:

    <?xml version="1.0" encoding="UTF-8"?>
     
    <beans xmlns="http://activemq.org/config/1.0">
     
      <broker name="sender" persistent="false" useJmx="false"> 
        <networkConnectors>
          <networkConnector uri="multicast://default"/>
        </networkConnectors>
     
        <persistenceAdapter>
          <memoryPersistenceAdapter/>
        </persistenceAdapter>
     
      <transportConnectors>
          <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
        </transportConnectors>
      </broker>
     
    </beans>
    

      

    Master Slave

    通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:

    1、Master-Slave
    2、SharedFile System Master Slave
    3、JDBCMaster Slave

    第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
    第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
    第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

    Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:

    第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
      ...
        <persistenceAdapter>
                <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
        </persistenceAdapter>
        ...
    </broker>
    

    消息生产者代码:

    public class P2PSender {
        private static final String QUEUE = "client1-to-client2";
    
        public static void main(String[] args) {
            // ConnectionFactory :连接工厂,JMS用它创建连接
            ConnectionFactory connectionFactory;
            // Connection :JMS客户端到JMS Provider的连接
            Connection connection = null;
            // Session:一个发送或接收消息的线程
            Session session;
            // Destination :消息的目的地;消息发送给谁.
            Destination destination;
            // MessageProducer:消息发送者
            MessageProducer producer;
            // TextMessage message;
            // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
            connectionFactory = new ActiveMQConnectionFactory(
                    "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
            try {
                // 构造从工厂得到连接对象
                connection = connectionFactory.createConnection();
                // 启动
                connection.start();
                // 获取操作连接
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue(QUEUE);
                // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
                // 得到消息生成者【发送者】
                producer = session.createProducer(destination);
                // 设置不持久化
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // 构造消息
                sendMessage(session, producer);
                // session.commit();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != connection) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        public static void sendMessage(Session session, MessageProducer producer) throws Exception {
            for (int i = 1; i <= 1; i++) {
                Date d = new Date();
                TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i + "  " + new Date());
                System.out.println("发送消息:ActiveMQ发送的消息" + i + "  " + new Date());
                producer.send(message);
            }
        }
    }

    消息消费者代码:

    public class P2PReceiver {
        private static final String QUEUE = "client1-to-client2";
        
        public static void main(String[] args) {
            // ConnectionFactory :连接工厂,JMS用它创建连接
            ConnectionFactory connectionFactory;
            // Connection :JMS客户端到JMS Provider的连接
            Connection connection = null;
            // Session:一个发送或接收消息的线程
            Session session;
            // Destination :消息的目的地;消息发送给谁.
            Destination destination;
            // 消费者,消息接收者
            MessageConsumer consumer;
            connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
            try {
                // 得到连接对象
                connection = connectionFactory.createConnection();
                // 启动
                connection.start();
                // 获取操作连接
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建Queue
                destination = session.createQueue(QUEUE);
                consumer = session.createConsumer(destination);
                while (true) {
                    TextMessage message = (TextMessage) consumer.receive();
                    if (null != message) {
                        System.out.println("收到消息" + message.getText());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    }
    

      

    转自 ActiveMQ的几种集群配置

  • 相关阅读:
    Java Sping 第一章——初识 Spring
    C++设计模式——状态模式 State
    线性代数思维导图(3)——向量组
    基于Servlet实现简单系统登录
    优秀博客汇总
    整理一些开源项目
    Android UI性能优化详解
    (原创)如何在spannableString中使用自定义字体
    (原创)用讯飞语音实现人机交互的功能
    (原创)speex与wav格式音频文件的互相转换(二)
  • 原文地址:https://www.cnblogs.com/binyue/p/5325945.html
Copyright © 2011-2022 走看看