zoukankan      html  css  js  c++  java
  • ActiveMQ集群

    1.ActiveMQ集群介绍

    1.为什么要集群?   

      实现高可用,以排除单点故障引起的服务中断

      实现负载均衡,以提升效率为更多客户提供服务

    2.集群方式

      客户端集群:让多个消费者消费同一个队列

      Broker Cluster:多个Broker之间同步消息(做不了高可用,可以实现负载均衡)

      Master-Slave:高可用(做不了负载均衡)

    3.ActiveMq失效转移

      允许当其中一台消息服务器宕机时,客户端在传输层上重新连接其他消息服务器。

      语法:failover:(uri1,...uriN)?transportOptions

        transportOptions参数说明:randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略

                    initialReconnectDelay默认为10,默认10毫秒,表示第一次尝试重连之间等待的时间。

                    maxReconnectDelay:默认30000,单位毫秒,最长重连时间的间隔

     4.Master/Slave集群配置

      Share nothing storage master/slave(已经过时,5.8之后移除)

      shared storage master/slave 共享存储(实际上是共享同一文件夹,只不过采用排他锁,所以只有一个master节点可以访问,当此服务宕机,另一台slave可以快速强占排他锁,所以不会造成数据丢失,使用同一文件夹下的东西。如果多态服务器的话需要搭建文件共享服务器)

      Replicated LevelDB Store 基于复制的LevelDB Store

    1.共享存储原理:(获取排他锁才可以提供消息服务)--简单方式

        

     2.Replicated LevelDB Store 基于复制的LevelDB Store原理(基于zookeper)

    3.两种方式对比:

      高可用 负载均衡
    Master/Slave
    Broker Cluster

    4.三台机器的完美集群方案:(实现高可用和负载均衡)

    2.ActiveMQ集群配置

    1.方案介绍

     方案如下: B与C采用master-slave共享文件夹存储(任一时刻只有一个可以占有排他锁,也就是只有一个可以提供服务),当A宕机之后B会获取资源锁提供服务---实现高可用

           A与B、A与C都采用Broker 集群,可以同时提供服务,不管B与C谁获取锁,A都可以提供服务(A只能提供服务消费消息,不生成消息)---实现负载均衡

      

    配置如下:(同一个电脑不同端口模拟集群)

        

     2.配置文件 (修改的配置文件都是在apache-activemq-5.15.6conf文件夹下)

       

    (1)activemq-a下面的配置:---A服务器

    activemq.xml:(注释掉其他协议,服务端口使用61616端口;增加静态网络连接器,同时连接B与C)

     jetty.xml:  采用默认的8161端口

    (2)activemq-b下面的配置:---B服务器

     activemq.xml:(增加与A的静态连接器,修改服务端口采用61617,修改共享文件夹的地址)

     

    jitty.xml:修改端口采用8162

    (3)activemq-c下面的配置:---C服务器

     activemq.xml:(增加与A的静态连接器,修改服务端口采用61618,修改共享文件夹的地址)

     

    jitty.xml:修改端口采用8163

    3.依次启动ActiveMq进行测试 

      启动顺序是:A->B->C,下面通过端口信息验证集群:

    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8161
      TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
      TCP    [::]:8161              [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8162
      TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2423756
      TCP    [::]:8162              [::]:0                 LISTENING       2423756
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8163
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61616
      TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
      TCP    127.0.0.1:55338        127.0.0.1:61616        ESTABLISHED     2423756
      TCP    127.0.0.1:61616        127.0.0.1:55338        ESTABLISHED     2423496
      TCP    [::]:61616             [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61617
      TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2423756
      TCP    127.0.0.1:55345        127.0.0.1:61617        ESTABLISHED     2423496
      TCP    127.0.0.1:61617        127.0.0.1:55345        ESTABLISHED     2423756
      TCP    [::]:61617             [::]:0                 LISTENING       2423756
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61618

      由于先启动的B,B占有排斥锁,所以B(8162-61617)处于监听状态,而C与B采用共享文件排他锁集群,所以C处于阻塞状态,也就是没有监听端口,被阻塞。

     停掉B服务器,模拟B服务器宕机的情况,再次查看端口:

    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8161
      TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
      TCP    [::]:8161              [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8162
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8163
      TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2423848
      TCP    [::]:8163              [::]:0                 LISTENING       2423848
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61616
      TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
      TCP    127.0.0.1:56704        127.0.0.1:61616        ESTABLISHED     2423848
      TCP    127.0.0.1:61616        127.0.0.1:56704        ESTABLISHED     2423496
      TCP    [::]:61616             [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61617
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61618
      TCP    0.0.0.0:61618          0.0.0.0:0              LISTENING       2423848
      TCP    127.0.0.1:56791        127.0.0.1:61618        ESTABLISHED     2423496
      TCP    127.0.0.1:61618        127.0.0.1:56791        ESTABLISHED     2423848
      TCP    [::]:61618             [::]:0                 LISTENING       2423848
    
    liqiang@root MINGW64 ~/Desktop

      由于B宕机,所以C会强占排他锁,也就是会监听端口,所以可以看到8163与61618端口的监听状态。

      现在模拟C也宕机,将C服务器也停掉。再次查看端口信息:

    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8161
      TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
      TCP    [::]:8161              [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8162
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8163
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61616
      TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
      TCP    [::]:61616             [::]:0                 LISTENING       2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61617
      TCP    127.0.0.1:57242        127.0.0.1:61617        SYN_SENT        2423496
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61618
    
    liqiang@root MINGW64 ~/Desktop

      此时A服务器仍然在监听端口。

      上面证明集群搭建成功,下面重新开启A->B->C服务器开始程序验证。

    4.程序验证集群

    服务器开启顺序性:A->B->C(此时B占有排他锁,B监听端口,A一直监听)

    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61617
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61616
      TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2428232
      TCP    127.0.0.1:60520        127.0.0.1:61616        ESTABLISHED     2427192
      TCP    127.0.0.1:61616        127.0.0.1:60520        ESTABLISHED     2428232
      TCP    [::]:61616             [::]:0                 LISTENING       2428232
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61617
      TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2427192
      TCP    127.0.0.1:60513        127.0.0.1:61617        ESTABLISHED     2428232
      TCP    127.0.0.1:61617        127.0.0.1:60513        ESTABLISHED     2427192
      TCP    [::]:61617             [::]:0                 LISTENING       2427192
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 61618
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8161
      TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2428232
      TCP    [::]:8161              [::]:0                 LISTENING       2428232
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8162
      TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2427192
      TCP    [::]:8162              [::]:0                 LISTENING       2427192
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8163

    代码测试:

    生产者:  url加了失效策略,而且采用随机选取,生产消息的地址只有B与C服务器地址

    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 生产消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:04:41
     */
    public class MsgProducer {
    
        private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";
        private static final String queueName = "myQueue";
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 3.启动connection
            connection.start();
            // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.创建Destination(Queue继承Queue)
            Queue destination = session.createQueue(queueName);
            // 6.创建生产者producer
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < 100; i++) {
                // 7.创建Message,有好多类型,这里用最简单的TextMessage
                TextMessage tms = session.createTextMessage("textMessage:" + i);
                // 8.生产者发送消息
                producer.send(tms);
    
                System.out.println("send:" + tms.getText());
            }
            // 9.关闭connection
            connection.close();
        }
    
    }

    消费者:url加了失效策略,而且采用随机选取,生产消息的地址有ABC服务器

    package cn.qlq.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消费消息
     * 
     * @author QiaoLiQiang
     * @time 2018年9月18日下午11:26:41
     */
    public class MsgConsumer {
    
        private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";
        private static final String queueName = "myQueue";
    
        public static void main(String[] args) throws JMSException {
            // 1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            // 2.由connectionFactory创建connection
            Connection connection = connectionFactory.createConnection();
            // 3.启动connection
            connection.start();
            // 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.创建Destination(Queue继承Queue)
            Queue destination = session.createQueue(queueName);
            // 6.创建消费者consumer
            MessageConsumer consumer = session.createConsumer(destination);
            // 7.给消费者绑定监听器(消息的监听是一个异步的过程,不可以关闭连接,绑定监听器线程是一直开启的,处于阻塞状态,所以可以在程序退出关闭)
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    // 7.1由于消费者接受的是TextMessage,所以强转一下
                    TextMessage tms = (TextMessage) message;
                    try {
                        System.out.println("接收消息:" + tms.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    }

    启动生产者发布100条消息:

    管理界面查看:(消息被发布在B服务器,B占有排他锁,C处于阻塞)

     A服务不生产消息所有A不会消费消息:(通过A查看网络连接器)

    关掉B服务器之后查看C服务器:(验证B与C共享同一文件夹,且强占排他锁)

    查看共享文件夹:

     此时A与C处于监听状态,启动消费者:

    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8161
      TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2431676
      TCP    [::]:8161              [::]:0                 LISTENING       2431676
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8162
    
    liqiang@root MINGW64 ~/Desktop
    $ netstat -ano |findstr 8163
      TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2432188
      TCP    [::]:8163              [::]:0                 LISTENING       2432188
      TCP    [::1]:8163             [::1]:63600            TIME_WAIT       0
      TCP    [::1]:8163             [::1]:63648            TIME_WAIT       0
      TCP    [::1]:8163             [::1]:63649            TIME_WAIT       0
      TCP    [::1]:8163             [::1]:63650            TIME_WAIT       0
      TCP    [::1]:8163             [::1]:63651            TIME_WAIT       0
      TCP    [::1]:8163             [::1]:63652            TIME_WAIT       0

    查看控制台如下:连接到A服务器消费信息

     总结:上面的配置方案B与C是为了实现高可用,也就是一台宕机之后另一台马上强占排他锁提供服务(需要共享文件夹实现共用同一文件夹下的资源与锁),B与C可以生产消息,也可以提供消费消息,但是同一时刻只有一个提供服务。

         提供A是为了与B、C实现负载均衡,A不生产消息,但是可以消费消息,替B或者C分担压力。

  • 相关阅读:
    VirtualBox Network设置的NAT和Bridged Adapter模式区别
    Kubernetes里的ConfigMap的用途
    使用Kubernetes里的job计算圆周率后2000位
    给谷歌输入法增添自定义词组,提高输入效率
    推荐一个yaml文件转json文件的在线工具
    GCC同时使用静态库和动态库链接
    Linux后台开发常用工具
    gcc链接参数--whole-archive的作用
    rdynamic和-whole-archive
    gcc和ld 中的参数 --whole-archive 和 --no-whole-archive
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/9728425.html
Copyright © 2011-2022 走看看