zoukankan      html  css  js  c++  java
  • rocketmq 集群搭建

    1.在本地虚拟机搭建双主双从集群需要准备两台虚拟机,每台虚拟机需要安装JDK,比较快捷的方式是先在一条机器上安装基本环境,然后克隆出另外一台。

    2.机器a部署broker-a 的主节点 broker-b的从节点,机器b部署 broker-b 的主节点 broker-a的从节点

    2.配置本地域名解析,方便后面的rocketmq配置文件中使用

    vim /etc/hosts

    3.重启网络服务,让域名解析生效

    systemctl restart network
    

    4.关闭防火墙或者开放端口。如果在虚拟机自己玩为了方便可以关闭防火墙,在正式环境需要开放响应的端口

    # 关闭防火墙
    systemctl stop firewalld.service 
    # 查看防火墙的状态
    firewall-cmd --state 
    # 禁止firewall开机启动
    systemctl disable firewalld.service
    
    # 开放name server默认端口
    firewall-cmd --remove-port=9876/tcp --permanent
    # 开放master默认端口
    firewall-cmd --remove-port=10911/tcp --permanent
    # 开放slave默认端口 (当前集群模式可不开启)
    firewall-cmd --remove-port=11011/tcp --permanent 
    # 重启防火墙
    firewall-cmd --reload
    

    5.在官网下载rocketmq 的二进制包,并上传至虚拟机

    https://rocketmq.apache.org/release_notes/release-notes-4.7.1/

    6.配置rocketmq环境变量

    vim /etc/profile

    添加以下配置,ROCKETMQ_HOME 为rocketmq安装包的位置

    ROCKETMQ_HOME=/usr/local/bin/rocketmq-all-4.5.0-bin-release
    PATH=$PATH:$ROCKETMQ_HOME/bin
    export ROCKETMQ_HOME PATH
    

    7.创建消息和消息索引的存储路径

    mkdir /usr/local/bin/rocketmq
    mkdir /usr/local/bin/rocketmq/store
    mkdir /usr/local/bin/rocketmq/store/broker-a
    mkdir /usr/local/bin/rocketmq/store/broker-b
    mkdir /usr/local/bin/rocketmq/store/broker-a/commitlog
    mkdir /usr/local/bin/rocketmq/store/broker-a/consumequeue
    mkdir /usr/local/bin/rocketmq/store/broker-a/index
    mkdir /usr/local/bin/rocketmq/store/broker-b/commitlog
    mkdir /usr/local/bin/rocketmq/store/broker-b/consumequeue
    mkdir /usr/local/bin/rocketmq/store/broker-b/index
    
    broker-a 和 broker-b 分别为两个broker的存储目录,commitlog 为消息存储目录,consumerqueue 为消息队列,index 是消息索引目录

    8.以上步骤均需要在两台机器上全部执行

    9.接下分别配置两个broker

    进入机器a的目录

    cd /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync
    

    在机器a上配置broker-a的master节点

    vim broker-a.properties 
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/bin/rocketmq/store/broker-a
    #commitLog 存储路径
    storePathCommitLog=/usr/local/bin/rocketmq/store/broker-a/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-a/consumerqueue
    #消息索引存储路径
    storePathIndex=/usr/local/bin/rocketmq/store/broker-a/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/bin/rocketmq/store/broker-a/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/bin/rocketmq/store/broker-a/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    brokerIP1=192.168.178.128
    

      

    在机器a上配置broker-b的slave节点

    vim broker-b-s.properties
    

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/bin/rocketmq/store/broker-b
    #commitLog 存储路径
    storePathCommitLog=/usr/local/bin/rocketmq/store/broker-b/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-b/consumerqueue
    #消息索引存储路径
    storePathIndex=/usr/local/bin/rocketmq/store/broker-b/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/bin/rocketmq/store/broker-b/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/bin/rocketmq/store/broker-b/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    brokerIP1=192.168.178.128

    在机器b上配置broker-b 的master 节点

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/bin/rocketmq/store/broker-b
    #commitLog 存储路径
    storePathCommitLog=/usr/local/bin/rocketmq/store/broker-b/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-b/consumerqueue
    #消息索引存储路径
    storePathIndex=/usr/local/bin/rocketmq/store/broker-b/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/bin/rocketmq/store/broker-b/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/bin/rocketmq/store/broker-b/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    brokerIP1=192.168.178.129
    

    在机器b上配置broker-a 的slave节点  

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/bin/rocketmq/store/broker-a
    #commitLog 存储路径
    storePathCommitLog=/usr/local/bin/rocketmq/store/broker-a/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-a/consumerqueue
    #消息索引存储路径
    storePathIndex=/usr/local/bin/rocketmq/store/broker-a/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/bin/rocketmq/store/broker-a/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/bin/rocketmq/store/broker-a/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    brokerIP1=192.168.178.129
    

    10.分别在a和b两台机器上修改jvm启动参数

    修改 runbroker.sh 中的jvm初始堆内存,最大堆内存,年轻代的大小

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"  

    修改 runserver.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    11.开始启动nameServer

    分别在a ,b两台机器运行

    cd /usr/local/bin/rocketmq-all-4.5.0-bin-release/bin
    
    nohup sh mqnamesrv &
    

    12.在机器a上启动broker-a的master节点

    cd /usr/local/bin/rocketmq-all-4.5.0-bin-release/bin
    
    nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-a.properties &
    

    在机器a上启动broker-b的slave节点 

    nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
    

    13.在机器b上启动broker-b的master节点

    nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-b.properties &
    

    在机器b上启动broker-a的slave节点

    nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
    

    14.查看启动的是否成功

      

    15.搭建集群监控管理平台

    (1)将 https://github.com/apache/rocketmq-externals 项目拉倒本地,这个项目的子项目 rocketmq-console 就是集群监控管理平台

    (2)修改项目的application.properties 文件中的nameserver地址

    rocketmq.config.namesrvAddr=192.168.178.128:9876;192.168.178.129:9876  

    (3)通过maven打成jar包

    (4)将jar包上传至任意一台机器

    (5)使用 java -jar rocketmq-console-ng-1.0.0.jar 命令启动

    (6)在浏览器访问控制台

    16 .写测试demo发送消息

    public interface Constants {
    
        String NAMESERVER_ADDR = "192.168.178.128:9876;192.168.178.129:9876";
    }
    
    public class SyncProducer {
    
        public static void main(String[] args) throws Exception {
            //1.创建生产者,指定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            // 2.链接nameserver
            producer.setNamesrvAddr(Constants.NAMESERVER_ADDR);
            //3.启动producer
            producer.start();
            //4.指定消息topic tag
            for (int i = 0; i < 10; i++) {
                Message message = new Message("base", "Tag1", ("Hello World" + i).getBytes());
                SendResult send = producer.send(message);
                System.out.println(send.getMsgId() + ">>>" + send.getMessageQueue() + ">>>" + send.getSendStatus());
                TimeUnit.SECONDS.sleep(1);
            }
            producer.shutdown();
        }
    }
    

    17.测试消费者

    public class Consumer {
        public static void main(String[] args) throws Exception {
            // 1.创建消息的消费者,指定消费者组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
            //2.指定nameserver地址
            consumer.setNamesrvAddr(Constants.NAMESERVER_ADDR);
            //3.指定topic 和 tag
            consumer.subscribe("base",  "*");
            //4.设置回调函数,消费消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    list.forEach(e -> {
                        System.out.println("收到消息:" + new String(e.getBody()));
                    });
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }
    
  • 相关阅读:
    大数据集群实验环境搭建
    ORACLE 自治事物
    UNDO内存结构剖析
    事物深度解析
    UNDO
    SCN
    检查点队列
    WPS Excel启用正则表达式
    Python遍历目录下xlsx文件
    Python 字符串指定位置替换字符
  • 原文地址:https://www.cnblogs.com/li-zhi-long/p/13615478.html
Copyright © 2011-2022 走看看