zoukankan      html  css  js  c++  java
  • ActiveMQ集群搭建

    一、 Activemq主备搭建

      Shared Filesystem Master-Slave方式
      shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
      多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

    1  搭建配置步骤搭建 master-slave (一主 一备份)

    准备mq的1节点 activemq-1
    准备mq的2节点 activemq-2

      特点:
        只能本地不能分布式 和 集群。

    针对每一个activemq的节点进行配置:

    1.1 配置节点1:
    首先创建共享目录,并创建两个节点
    如图:

    •  配置activemq-1,需要修改持久数据库位置,修改:activemq-1/conf/activemq.xml 

    • 配置activemq-1,需要修改activemq-1/conf/activemq.xml

      如图:修改成61617

    • 配置activemq-1 ,需要修改activemq-1/conf/jetty.xml

    2.2 配置节点2:

    • 配置activemq-2,需要修改 持久目录文件,修改:activemq-2/conf/activemq.xml 

    • 配置activemq-2,需要修改activemq-2/conf/activemq.xml

      如图:修改成61618

    • 配置activemq-1 ,需要修改activemq-2/conf/jetty.xml

    2 测试master-slave
    2.1 生产者

     1 public class ProduceQueue {


     2     @Test

     3     public void sendMessage() throws Exception{

     4         //1.创建一个连接工厂 connectionfactory 参数:就是要连接的服务器的地址

     5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

     6         //2.通过工厂获取连接对象 创建连接 7         Connection connection = factory.createConnection();

     8         //3.开启连接 9         connection.start();

    10         //4.创建一个session对象  提供发送消息等方法

    11         // 第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。

    12         // 第二个参数:就是设置消息的应答模式
    13         // 如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    15         //5.创建目的地 (destination)  queue 参数:目的地的名称

    16         Queue queue = session.createQueue("queue-test-cluster");

    17         //6.创建个生产者18         MessageProducer producer = session.createProducer(queue);

    19         //7.构建消息的内容20         TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
    21         // 8.发送消息22         producer.send(textMessage);

    23         //9.关闭资源24         producer.close();

    25         session.close();

    26         connection.close();

    27     }
    28 
}

    2.2 消费者

     1 public class ConsumerQueue {


     2     @Test

     3     public void consumer() throws Exception{

     4         //1.创建连接的工厂 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

     6         //2.创建连接 7         Connection connection = factory.createConnection();

     8         //3.开启连接 9         connection.start();

    10         //4.创建session

    11         // 第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。

    12         // 第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    14         // 5.创建接收消息的一个目的地15         Queue queue = session.createQueue("queue-test-cluster");

    16         // 6.创建消费者17         MessageConsumer consumer = session.createConsumer(queue);

    18         // 7.接收消息 打印

    19         // 第一种20         /*while(true){

    21             Message message = consumer.receive(1000000);//设置接收消息的超时时间

    22             //没有接收到消息就跳出循环

    23             if(message==null){

    24                 break;

    25             }

    26             if(message instanceof TextMessage){

    27                 TextMessage message2 = (TextMessage) message;

    28                 System.out.println("接收的消息为"+message2.getText());

    29                 }

    30          }*/31         //第二种


    32         // 设置一个监听器

    33         // System.out.println("start");

    34         // 这里其实开辟了一个新的线程35         consumer.setMessageListener(new MessageListener() {


    36             //当有消息的时候会执行以下的逻辑37             @Override

    38             public void onMessage(Message message) {

    39                 if(message instanceof TextMessage){

    40                     TextMessage message2 = (TextMessage) message;

    41                     try {

    42                         System.out.println("接收的消息为"+message2.getText());

    43                     } catch (JMSException e) {

    44                         e.printStackTrace();

    45                     }

    46                 }

    47             }

    48         });

    49         //System.out.println("end");50         Thread.sleep(199999);

    51         // 8.关闭资源52         consumer.close();

    53         session.close();

    54         connection.close();

    55     }


    56 }

    先启动的成为主节点,平常主节点工作,slave不工作但是一直做监听。当主节点挂掉,slave接手工作。

    二、 基于zookeeper的activemq集群搭建(推荐)

    2.1 基于可复制的 LevelDB
      LevelDB 是 Google 开发的一套用于持久化数据的高性能类库。 LevelDB 并不是一种服务,用户需要自行实现 Server。 是单进程的服务,能够处理十亿级别规模 Key-Value 型数据,占用内存小。
      http://activemq.apache.org/replicated-leveldb-store.html

     

      高可用的原理:使用 ZooKeeper(集群)注册所有的 ActiveMQ Broker。只有其中的一个 Broker 可以提供服务,被视为 Master,其他的 Broker 处于待机状态,被视为 Slave。如果 Master 因故障而不能提供服务,ZooKeeper 会从 Slave 中选举出一个 Broker 充当 Master。Slave 连接 Master 并同步他们的存储状态, Slave 不接受客户端连接。所有的存储操作都将被复制到连接至 Master 的 Slaves。 如果 Master 宕了,得到了最新更新的 Slave 会成为 Master。 故障节点在恢复后会重新加入到集群中并连接 Master 进入 Slave 模式。所有需要同步的 disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了 replicas=3,那么法定大小是(3/2)+1=2。 Master 将会存储并更新然后等待 (2-1)=1 个Slave 存储和更新完成,才汇报 success。 至于为什么是 2-1,熟悉 Zookeeper 的应该知道,有一个 node要作为观擦者存在。当一个新的 Master 被选中,你需要至少保障一个法定 node 在线以能够找到拥有最新状态的 node。这个 node 可以成为新的 Master。因此,推荐运行至少 3 个 replica nodes,以防止一个 node失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式类似)。

    2.2 集群单机环境规划
      定义环境:3个activemq节点(node01 node02 node03)

    Ip 集群通信端口 节点消息连接端口 Jetty后台运行端口
    192.168.25.130 63631 51515 8361
    192.168.25.130 63632 51516 8362
    192.168.25.130 63633 51517 8363

    2.3 配置zookeeper集群

    见相关教程;

    2.4 节点1的配置

      在activemq.xml中配置:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

      brokerName指定一个名字:任意即可。但是整个集群的所有的配置项名称都应该是一致的。

      broker标签下的配置:

    <persistenceAdapter> 
      <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
      <replicatedLevelDB 
      directory="${activemq.data}/leveldb" 
      replicas="3" 
      bind="tcp://0.0.0.0:63631" 
      zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
      hostname="localhost" 
      zkPath="/activemq2/leveldb-stores"/> 
    </persistenceAdapter>

      jetty.xml:配置项:

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
      <!-- the default port number for the web console -->
      <property name="host" value="0.0.0.0"/>
      <property name="port" value="8361"/>
    </bean>

    2.5 节点2的配置
      参考节点1搭建即可,注意:端口不要一样。

      搭建后的截图:


      node01 -node03 是activemq的3个节点。

  • 相关阅读:
    package的使用
    package的使用
    访问控制符详解
    访问控制符详解
    继承
    Java 重写 & 重载 & super 关键字
    继承和权限控制
    错误: 程序包com.bjsxt.java140不存在
    package和import语句
    static关键字
  • 原文地址:https://www.cnblogs.com/gdwkong/p/9021156.html
Copyright © 2011-2022 走看看