zoukankan      html  css  js  c++  java
  • rocketmq集群(三)

    一、修改 hosts

    我是准备了两台虚拟机,所以分别在两台修改了hosts,用如下命令修改hosts

    vim /etc/hosts

    分别在两台虚拟机上配置如下配置,ip是你自己虚拟机的ip

    192.168.32.128 rocketmq-n1 
    192.168.32.129 rocketmq-n2

    修改完后刷新配置

    systemctl restart network

    二、配置文件broker-a.properties(128)

    之前在128上面配置单机版本了,进入他的conf文件夹,在conf文件夹中有,里面有主从配置,里面提供了三种主从的配置方式,分别是2主2从异步刷盘、2主双写、2主2从的同步刷盘;

     因为我今天想布的是2主2从同步刷盘,所以先进入如下文件夹进行修改broker-a.properties文件

     修改配置如下

    brokerClusterName=rocketmq-cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker-a 
    #0 表示 Master,>0 表示 Slave
    brokerId=0 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums
    =4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable
    =true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup
    =true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue
    =300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径
    storePathConsumeQueue
    =/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
    brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128

    三、配置文件broker-b-s.properties(128)

    brokerClusterName=rocketmq-cluster 
    brokerName=broker-b
    brokerId=1 
    namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 
    defaultTopicQueueNums=4 
    autoCreateTopicEnable=true 
    autoCreateSubscriptionGroup=true 
    listenPort=11011 
    deleteWhen=04 
    fileReservedTime=120 
    mapedFileSizeCommitLog=1073741824 
    mapedFileSizeConsumeQueue=300000 
    diskMaxUsedSpaceRatio=88 
    storePathRootDir=/data/rocketmq/store-s 
    storePathCommitLog=/data/rocketmq/store-s/commitlog
    storePathConsumeQueue=/data/rocketmq/store-s/consumequeue
    storePathIndex=/data/rocketmq/store-s/index
    storeCheckpoint=/data/rocketmq/store-s/checkpoint
    abortFile=/data/rocketmq/store-s/abort
    maxMessageSize=65536
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH

    四、配置文件broker-b.properties (129)

    brokerClusterName=rocketmq-cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker-b 
    #0 表示 Master,>0 表示 Slave 
    brokerId=0 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums
    =4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup
    =true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue
    =300000 #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径
    storePathConsumeQueue
    =/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
    brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128

    五、配置文件broker-a-s.properties(129)

    brokerClusterName=rocketmq-cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker-a 
    #0 表示 Master,>0 表示 Slave 
    brokerId=1 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums
    =4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup
    =true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue
    =300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store-s #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store-s/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store-s/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store-s/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store-s/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store-s/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
    brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128

    六、启动集群 

    创建目录

    # 分别在两台虚拟机上执行 128服务器
    mkdir /data/rocketmq/store -p 
    mkdir /data/rocketmq/store/commitlog -p 
    mkdir /data/rocketmq/store/consumequeue -p 
    mkdir /data/rocketmq/store/index -p 
    # 129服务器
    mkdir /data/rocketmq/store-s -p 
    mkdir /data/rocketmq/store-s/commitlog -p 
    mkdir /data/rocketmq/store-s/consumequeue -p 
    mkdir /data/rocketmq/store-s/index -p
    停止broker namesrv(因为我之前开启过所以要停止)
    cd /usr/local/rocketmq/conf/2m-2s-sync 
    sh /usr/local/rocketmq/bin/mqshutdown broker 
    #用lsof -i:10911看是否停成功 sh
    /usr/local/rocketmq/bin/mqshutdown namesrv
    #用lsof -i:9876验证是否成功
     在两个服务器的 bin目录下启动mq
    nohup /usr/local/rocketmq/bin/mqnamesrv & 

    用tail -f nohup.out看是否成功;

    编写配置文件,并写好配置在2m-2s-sync文件夹下(128)
    echo "brokerIP1=192.168.32.128" >> broker-a.properties
    echo "brokerIP1=192.168.32.128" >> broker-b-s.properties

    同理在129上也配置下

    echo "brokerIP1=192.168.32.129" >> broker-b.properties
    echo "brokerIP1=192.168.32.129" >> broker-a-s.properties
    在 128上执行(在2m目录)
    nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties & 
    nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
    用命令看下是否成功
    tail -f nohup.out
    在 129 上执行
    nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties & 
    nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
    验证启动是否成功在单机中说明过,这里就不说明了,查看集群信息
    /usr/local/rocketmq/bin/mqadmin clusterlist -n 192.168.32.128:9876;192.168.32.129:9876
    关闭
    sh /usr/local/rocketmq/bin/mqshutdown broker 
    sh /usr/local/rocketmq/bin/mqshutdown namesrv
    创建集群topic(在bin目录下),这是按官网例子写的,结果报错不行
    sh mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876;192.168.32.129:9876 -b 192.168.32.128:10911

    于是换了个写法

    ./mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876 -c rocketmq-cluster

    启动代码

    public class ClusterConsumer {
        public static void main(String[] args) throws MQClientException {
            // 1. 创建消费者(Push)对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            consumer.setNamesrvAddr("192.168.32.128:9876;92.168.32.129:9876");
    
            // 3. 订阅对应的主题和Tag
            consumer.subscribe("TopicTest", "*");
    
            // 4. 注册消息接收到Broker消息后的处理接口
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        MessageExt messageExt = list.get(0);
                        System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
            consumer.start();
    
            System.out.println("已启动消费者");
        }
    }
    public class ClusterProducer {
        public static void main(String[] args) throws MQClientException, IOException, RemotingException, InterruptedException, MQBrokerException {
            // 1. 创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            producer.setNamesrvAddr("192.168.32.128:9876;92.168.32.129:9876");
    
            // 3. 启动生产者
            producer.start();
    
            // 4. 生产者发送消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                SendResult result = producer.send(message);
    
                System.out.printf("发送结果:%s%n", result);
            }
    
            System.in.read();
            // 5. 停止生产者
            producer.shutdown();
        }
    }

     

    七、rocketmq web管理界面

    git clone源码
     
    git clone https://github.com/apache/rocketmq-externals
    进入到 rocketmq-console 目录,执行 maven 编译打包
    cd rocketmq-externals/rocketmq-console 
    mvn clean package -Dmaven.test.skip=true
    启动
    # jar包在target目录下面,你可以copy到其他目录去运行 
    java -jar rocketmq-console-ng-1.0.0.jar --server.port=8081 -- rocketmq.config.namesrvAddr=192.168.32.128:9876 > /usr/local/rocketmq/logs/mq- console.log 2>&1 &
    # --server.port springboot内置tomcat的端口号,默认8080;
    # --rocketmq.config.namesrvAddr nameserver的地址
    这短短的一生我们最终都会失去,不妨大胆一点,爱一个人,攀一座山,追一个梦
  • 相关阅读:
    四十四 常用内建模块 struct
    四十三 常用内建模块 base64
    Django Haystack 全文检索与关键词高亮
    python实现简单tftp(基于udp)
    多线程socket UDP收发数据
    Python 线程复习
    python 进程复习
    python pdb 调试
    Linux 复习
    Django 博客
  • 原文地址:https://www.cnblogs.com/xing1/p/15490223.html
Copyright © 2011-2022 走看看