zoukankan      html  css  js  c++  java
  • 【原创】《从0开始学RocketMQ》—集群搭建

    用两台服务器,搭建出一个双master双slave、无单点故障的高可用 RocketMQ 集群。此处假设两台服务器的物理 IP 分别为:192.168.50.1、192.168.50.2。

    内容目录

    1. 启动 NameServer 集群

    2. 启动 Broker 集群

    3. RocketMQ 可视化管理控制台:rocketmq-console

    4. 集群测试

     

    1. 启动 NameServer 集群

    在两台服务器上分别启动 NameServer,可以得到一个无单点故障的 NameServer 服务,服务地址分别为:192.168.50.1:9876、192.168.50.2:9876。

     

    2. 启动 Broker 集群

    修改 Broker 配置文件,以使每台服务器上都可以启动一个 Master 角色 的 Broker 和 一个Slave 角色的 Broker。
    首先找到 Broker 配置文件,此处我们搭建一个同步双写模式的集群,所以需要修改 2m-2s-sync 目录下的 broker 配置文件:

    [root@157-89 ~]# cd /usr/local/rocketmq-all-4.3.2-bin-release/conf/
    [root@157-89 conf]# ls
    2m-2s-async  2m-2s-sync  2m-noslave  broker.conf  logback_broker.xml  logback_namesrv.xml  logback_tools.xml
    [root@157-89 conf]# cd 2m-2s-sync/
    [root@157-89 2m-2s-sync]# ls
    broker-a.properties  broker-a-s.properties  broker-b.properties  broker-b-s.properties

    1) 修改 192.168.50.1 服务器上的 broker-a.properties 为 Master 角色的 Broker:

    namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
    brokerClusterName=rocketMqCluster
    brokerIP1=192.168.50.1
    brokerName=broker-a
    brokerId=0
    deleteWhen=04
    fileReservedTime=120
    mapedFileSizeConsumeQueue=500000
    brokerRole=SYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    listenPort=10911
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a
    storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/commitlog
    storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/consumequeue
    storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/index
    storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/checkpoint
    abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/abort

    2) 修改 192.168.50.2 服务器上的 broker-b.properties 为 Master 角色的 Broker:

    namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
    brokerClusterName=rocketMqCluster
    brokerIP1=192.168.50.2
    brokerName=broker-b
    brokerId=0
    deleteWhen=04
    fileReservedTime=120
    mapedFileSizeConsumeQueue=500000
    brokerRole=SYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    listenPort=10911
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b
    storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/commitlog
    storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/consumequeue
    storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/index
    storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/checkpoint
    abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/abort

    3) 修改 192.168.50.1 服务器上的 broker-b-s.properties 为 Slave 角色的 Broker:

    namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
    brokerClusterName=rocketMqCluster
    brokerIP1=192.168.50.1
    brokerName=broker-b
    brokerId=1
    deleteWhen=04
    fileReservedTime=120
    mapedFileSizeConsumeQueue=500000
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    listenPort=10921
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s
    storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/commitlog
    storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/consumequeue
    storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/index
    storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/checkpoint
    abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/abort

    4) 修改 192.168.50.2 服务器上的 broker-a-s.properties 为 Slave 角色的 Broker:

    namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
    brokerClusterName=rocketMqCluster
    brokerIP1=192.168.50.2 
    brokerName=broker-a
    brokerId=1
    deleteWhen=04
    fileReservedTime=48
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    listenPort=10921
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s
    storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/commitlog
    storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/consumequeue
    storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/index
    storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/checkpoint
    abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/abort

    一台服务器上启动多个Broker 时,需指定不同的端口号,记得防火墙放开 NameServer 和 Broker 中用到的端口号哦~

    分别启动四个 Broker:

    nohup sh bin/mqbroker -c broker_config_file &

     

    3. RocketMQ 可视化管理控制台:rocketmq-console

    在服务器 192.168.50.1 上安装即可,无需集群

    [root@153-215 local]# git clone https://github.com/apache/rocketmq-externals.git
    Cloning into 'rocketmq-externals'...
    remote: Enumerating objects: 10, done.
    remote: Counting objects: 100% (10/10), done.
    remote: Compressing objects: 100% (10/10), done.
    remote: Total 9425 (delta 2), reused 1 (delta 0), pack-reused 9415
    Receiving objects: 100% (9425/9425), 11.86 MiB | 232.00 KiB/s, done.
    Resolving deltas: 100% (4235/4235), done.
    [root@153-215 local]# cd rocketmq-externals/
    [root@153-215 rocketmq-externals]# ls
    dev  README.md  rocketmq-console  rocketmq-docker  rocketmq-flink  rocketmq-flume  rocketmq-hbase  rocketmq-iot-bridge  rocketmq-jms  rocketmq-mysql  rocketmq-php  rocketmq-redis  rocketmq-sentinel  rocketmq-serializer  rocketmq-spark
    [root@153-215 rocketmq-externals]# git branch
    * master
    [root@153-215 rocketmq-externals]# git fetch origin release-rocketmq-console-1.0.0
    From https://github.com/apache/rocketmq-externals
     * branch            release-rocketmq-console-1.0.0 -> FETCH_HEAD
    [root@153-215 rocketmq-externals]# git checkout -b release-1.0.0 origin/release-rocketmq-console-1.0.0
    Branch 'release-1.0.0' set up to track remote branch 'release-rocketmq-console-1.0.0' from 'origin'.
    Switched to a new branch 'release-1.0.0'
    [root@153-215 rocketmq-externals]# ls
    README.md  rocketmq-console
    [root@153-215 rocketmq-externals]# ls rocketmq-console/
    doc  LICENSE  NOTICE  pom.xml  README.md  src  style
    [root@153-215 rocketmq-externals]# vim rocketmq-console/src/main/resources/application.properties 

    编辑 application.properties:

    server.contextPath=/rocketmq
    server.port=8080
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/tmp/rocketmq-console/data
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true

    移动 rocketmq-console 所在目录,编译并启动 rocketmq-console:

    [root@153-215 rocketmq-console]# mv /usr/local/rocketmq-externals/rocketmq-console /usr/local/rocketmq-console
    [root@153-215 rocketmq-console]# cd /usr/local/rocketmq-console/
    [root@153-215 rocketmq-console]# ls
    doc  LICENSE  NOTICE  pom.xml  README.md  src  style
    [root@153-215 rocketmq-console]# mvn clean package -Dmaven.test.skip=true
    ........
    [INFO] Building jar: /usr/local/rocketmq-console/target/rocketmq-console-ng-1.0.0-sources.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 02:54 min
    [INFO] Finished at: 2019-01-11T17:02:34+08:00
    [INFO] ------------------------------------------------------------------------
    [root@153-215 rocketmq-console]# ls
    doc  LICENSE  NOTICE  pom.xml  README.md  src  style  target
    [root@153-215 rocketmq-console]# ls target/
    checkstyle-cachefile  checkstyle-checker.xml  checkstyle-result.xml  classes  generated-sources  maven-archiver  maven-status  rocketmq-console-ng-1.0.0.jar  rocketmq-console-ng-1.0.0.jar.original  rocketmq-console-ng-1.0.0-sources.jar
    [root@153-215 rocketmq-console]# java -jar target/rocketmq-console-ng-1.0.0.jar
    .......
    [2019-01-11 17:04:15.980]  INFO Initializing ProtocolHandler ["http-nio-8080"]
    [2019-01-11 17:04:15.991]  INFO Starting ProtocolHandler [http-nio-8080]
    [2019-01-11 17:04:16.232]  INFO Using a shared selector for servlet write/read
    [2019-01-11 17:04:16.251]  INFO Tomcat started on port(s): 8080 (http)
    [2019-01-11 17:04:16.257]  INFO Started App in 6.594 seconds (JVM running for 7.239)

     

    4. 集群测试

    Producer 测试代码:

    public class SyncProducerTest {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("producer_test_group");
            producer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876");
            try{
                producer.start();
                for(int i=0;i<100;i++){
                    Message message = new Message("topic_test", "tag_test", ("Hello World" + 1).getBytes("UTF-8"));
                    SendResult sendResult = producer.send(message);
                    System.out.println(JSON.toJSON(sendResult));
                }
                producer.shutdown();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    Consumer 测试代码:

    public class SyncConsumerTest {
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test_group");
            consumer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            try {
                consumer.subscribe("topic_test", "*");
                consumer.registerMessageListener((MessageListenerConcurrently) (messageList, context) -> {
                    System.out.println(Thread.currentThread().getName() + " Receive New Message:" + messageList);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    SyncProducerTest 运行日志:

    SyncConsumerTest 运行日志:

    通过日志可以看到,消费者、生产者收发消息都是正常的,我们去可视化管理控制台查看下 http://192.168.50.1:8080/rocketmq:

    通过管控台可以看到,双 master 双 slave 的 broker 集群一切正常,并可进一步看到每个 broker 处理消息的情况。

  • 相关阅读:
    浅谈流形学习
    流形(Manifold)初步【转】
    MATLAB中的函数的归总
    LBP特征提取实现
    Nginx 安装
    Linux执行.sh文件,提示No such file or directory的问题
    Ubuntu 安装后的配置及美化(二)
    Ubuntu 安装后的配置及美化(一)
    关于windows上 web 和 ftp 站点的创建及使用
    win10 + Lubuntu 双系统安装
  • 原文地址:https://www.cnblogs.com/mengyi/p/10296599.html
Copyright © 2011-2022 走看看