zoukankan      html  css  js  c++  java
  • ActiveMQ的几种集群配置

    ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等。企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置。

    Queue consumer clusters

    此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:

    Broker clusters

    此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。

    failover:(tcp://localhost:61616,tcp://localhost:61617)

    failover官网介绍 http://activemq.apache.org/failover-transport-reference.html

    broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:

    静态发现:

    静态发现通过配置固定的broker uri来发现彼此,配置语法如下:

    static:(uri1,uri2,uri3,...)?options

    例如:

    static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

     更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html

    动态发现:

    动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:

    1 <broker name="foo">
    2   <transportConnectors>
    3     <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    4   </transportConnectors>
    5   ...
    6 </broker>

     更多动态发现机制介绍,见官网http://activemq.apache.org/discovery-transport-reference.html

    Networks of brokers

    多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。该方案主要有以下两种配置方式:

    1、为broker配置文件配置networkConnector元素

    2、使用发现机制互相探测broker

    Here is an example of using the fixed list of URIs:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2  
     3 <beans xmlns="http://activemq.org/config/1.0">
     4  
     5   <broker brokerName="receiver" persistent="false" useJmx="false"> 
     6     <networkConnectors>
     7       <!-- Static discovery -->
     8       <networkConnector uri="static:(tcp://localhost:62001)"/>
     9       <!-- MasterSlave Discovery -->
    10       <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
    11     </networkConnectors>
    12  
    13     <persistenceAdapter>
    14       <memoryPersistenceAdapter/>
    15     </persistenceAdapter>
    16  
    17    <transportConnectors>
    18       <transportConnector uri="tcp://localhost:62002"/>
    19     </transportConnectors>
    20   </broker>
    21  
    22 </beans>

    This example uses multicast discovery:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2  
     3 <beans xmlns="http://activemq.org/config/1.0">
     4  
     5   <broker name="sender" persistent="false" useJmx="false"> 
     6     <networkConnectors>
     7       <networkConnector uri="multicast://default"/>
     8     </networkConnectors>
     9  
    10     <persistenceAdapter>
    11       <memoryPersistenceAdapter/>
    12     </persistenceAdapter>
    13  
    14   <transportConnectors>
    15       <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    16     </transportConnectors>
    17   </broker>
    18  
    19 </beans>

    Master Slave

    通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:

    1、Master-Slave
    2、SharedFile System Master Slave
    3、JDBCMaster Slave

    第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
    第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
    第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

    Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:

    第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:

    1 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
    2   ...
    3     <persistenceAdapter>
    4             <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
    5     </persistenceAdapter>
    6     ...
    7 </broker>

    消息生产者代码:

     1 public class P2PSender {
     2     private static final String QUEUE = "client1-to-client2";
     3 
     4     public static void main(String[] args) {
     5         // ConnectionFactory :连接工厂,JMS用它创建连接
     6         ConnectionFactory connectionFactory;
     7         // Connection :JMS客户端到JMS Provider的连接
     8         Connection connection = null;
     9         // Session:一个发送或接收消息的线程
    10         Session session;
    11         // Destination :消息的目的地;消息发送给谁.
    12         Destination destination;
    13         // MessageProducer:消息发送者
    14         MessageProducer producer;
    15         // TextMessage message;
    16         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
    17         connectionFactory = new ActiveMQConnectionFactory(
    18                 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
    19         try {
    20             // 构造从工厂得到连接对象
    21             connection = connectionFactory.createConnection();
    22             // 启动
    23             connection.start();
    24             // 获取操作连接
    25             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    26             destination = session.createQueue(QUEUE);
    27             // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
    28             // 得到消息生成者【发送者】
    29             producer = session.createProducer(destination);
    30             // 设置不持久化
    31             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    32             // 构造消息
    33             sendMessage(session, producer);
    34             // session.commit();
    35             connection.close();
    36         } catch (Exception e) {
    37             e.printStackTrace();
    38         } finally {
    39             if (null != connection) {
    40                 try {
    41                     connection.close();
    42                 } catch (JMSException e) {
    43                     e.printStackTrace();
    44                 }
    45             }
    46         }
    47     }
    48 
    49     public static void sendMessage(Session session, MessageProducer producer) throws Exception {
    50         for (int i = 1; i <= 1; i++) {
    51             Date d = new Date();
    52             TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i + "  " + new Date());
    53             System.out.println("发送消息:ActiveMQ发送的消息" + i + "  " + new Date());
    54             producer.send(message);
    55         }
    56     }
    57 }

    消息消费者代码:

     1 public class P2PReceiver {
     2     private static final String QUEUE = "client1-to-client2";
     3     
     4     public static void main(String[] args) {
     5         // ConnectionFactory :连接工厂,JMS用它创建连接
     6         ConnectionFactory connectionFactory;
     7         // Connection :JMS客户端到JMS Provider的连接
     8         Connection connection = null;
     9         // Session:一个发送或接收消息的线程
    10         Session session;
    11         // Destination :消息的目的地;消息发送给谁.
    12         Destination destination;
    13         // 消费者,消息接收者
    14         MessageConsumer consumer;
    15         connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
    16         try {
    17             // 得到连接对象
    18             connection = connectionFactory.createConnection();
    19             // 启动
    20             connection.start();
    21             // 获取操作连接
    22             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    23             // 创建Queue
    24             destination = session.createQueue(QUEUE);
    25             consumer = session.createConsumer(destination);
    26             while (true) {
    27                 TextMessage message = (TextMessage) consumer.receive();
    28                 if (null != message) {
    29                     System.out.println("收到消息" + message.getText());
    30                 }
    31             }
    32         } catch (Exception e) {
    33             e.printStackTrace();
    34         } finally {
    35             try {
    36                 if (null != connection)
    37                     connection.close();
    38             } catch (Throwable ignore) {
    39             }
    40         }
    41     }
    42 }


    
    

  • 相关阅读:
    清明节实现所有网页变灰色
    点击按钮,复制文本
    Matlab笔记
    spring框架中配置mysql8.0需要注意的地方(转载)
    移动吉比特H2-2光猫超级用户与密码
    JS关闭chorme页面
    MATLAB利用solve函数解多元一次方程组
    微信聊天记录导出为csv,并生成词云图
    fmt.Sprintf(格式化输出)
    iris,context源码分析
  • 原文地址:https://www.cnblogs.com/forchase/p/4317752.html
Copyright © 2011-2022 走看看