zoukankan      html  css  js  c++  java
  • RocketMQ集群搭建(双主双从)

    测试机器有限, 使用2台机器搭建集群; 192.168.8.113和192.168.8.114

    nameServ1 注册中心 192.168.8.113:9876
    nameServ1 注册中心 192.168.8.114:9876
    broker-a broker-a --master 192.168.8.113:10911     broker-a.properties
    broker-b-s broker-b --slave 192.168.8.113:10912     broker-b-s.properties
    broker-b broker-b --master 192.168.8.114:10911     broker-b.properties
    broker-a-s broker-a --slave 192.168.8.114:10912     broker-a-s.properties
     

    1、下载Rocketmq

    下载rocketmq-all-4.5.2-bin-release.zip,并解压到/usr/local  重命名为rocketmq

    https://rocketmq.apache.org/release_notes/release-notes-4.5.2/

    rocketmq是java编写,所以要配置jdk

    2、创建目录

    2台机器上执行以下命令创建目录

    mkdir /usr/local/rocketmq/store

    mkdir /usr/local/rocketmq/store/commitlog

    mkdir /usr/local/rocketmq/store/consumequeue

    mkdir /usr/local/rocketmq/store/index

    mkdir /usr/local/rocketmq/logs

    mkdir /usr/local/rocketmq/store2

    mkdir /usr/local/rocketmq/store2/commitlog

    mkdir /usr/local/rocketmq/store2/consumequeue

    mkdir /usr/local/rocketmq/store2/index

    3、修改配置文件

    cd  conf  

    cp  -r  2m-2s-async   myconf

    broker-a.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-a
    #集群中 0 表示 Master,>0 表示 Slave
    brokerId=0
    brokerRole=ASYNC_MASTER
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH

    #指定broker的IP
    brokerIP1=192.168.8.113
    #nameServer地址,集群用分号分割
    namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=false
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=false
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store

    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000

    #checkTransactionMessageEnable=false
    #发消息线程池数量
    sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessaeThreadPoolNums=128

    #发送消息是否使用可重入锁
    useReentrantLockWhenPutMessage=true
    waitTimeMillsInSendQueue=300  #或者更大

    broker-b-s.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-b
    #集群中 0 表示 Master,>0 表示 Slave
    brokerId=1
    brokerRole=SLAVE
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH

    #指定broker的IP
    brokerIP1=192.168.8.113
    #nameServer地址,集群用分号分割
    namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=false
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=false
    #Broker 对外服务的监听端口
    listenPort=10912
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store2
    #commitLog 存储路径

    storePathCommitLog=/usr/local/rocketmq/store2/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store2/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store2/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store2/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000

    #checkTransactionMessageEnable=false
    #发消息线程池数量
    sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessaeThreadPoolNums=128

    #发送消息是否使用可重入锁
    useReentrantLockWhenPutMessage=true
    waitTimeMillsInSendQueue=300  #或者更大

    broker-b.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-b
    #集群中 0 表示 Master,>0 表示 Slave
    brokerId=0
    brokerRole=ASYNC_MASTER
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH

    #指定broker的IP
    brokerIP1=192.168.8.114
    #nameServer地址,集群用分号分割
    namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=false
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=false
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径

    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000

    #checkTransactionMessageEnable=false
    #发消息线程池数量
    sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessaeThreadPoolNums=128

    #发送消息是否使用可重入锁
    useReentrantLockWhenPutMessage=true
    waitTimeMillsInSendQueue=300  #或者更大

    broker-a-s.properties

    brokerClusterName=DefaultCluster
    brokerName=broker-a
    #集群中 0 表示 Master,>0 表示 Slave
    brokerId=1
    brokerRole=SLAVE
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH

    #指定broker的IP
    brokerIP1=192.168.8.114
    #nameServer地址,集群用分号分割
    namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=false
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=false
    #Broker 对外服务的监听端口
    listenPort=10912
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store2
    #commitLog 存储路径

    storePathCommitLog=/usr/local/rocketmq/store2/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store2/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store2/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store2/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000

    #checkTransactionMessageEnable=false
    #发消息线程池数量
    sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessaeThreadPoolNums=128

    #发送消息是否使用可重入锁
    useReentrantLockWhenPutMessage=true
    waitTimeMillsInSendQueue=300  #或者更大

    注意: 上面配置文件中    【#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=false
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=false】 将topic和订阅组全部关闭自动创建了。  所以在使用Java-API调用之前,一定要手动创建Topic和消费者订阅组,不然消费者端无法获取到消息。 

    sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

    sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

    创建主题:myTopic    ,    订阅组: customerGroup

    4、修改rocketmq启动脚本

    适当修改jvm内存大小

    vim /usr/local/rocketmq/bin/runbroker.sh

    vim /usr/local/rocketmq/bin/runserver.sh

    vim /usr/local/rocketmq/bin/tools.sh

    .

    5、启动注册中心nameSrv

        2台机器都启动nameSrv

    #提前创建好目录

    nohup sh bin/mqnamesrv > ./logs/namesrv.log 2>&1 &

    6、启动broker

    192.168.8.113 执行: 

    nohup sh bin/mqbroker -c conf/myconf/broker-a.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-a.log 2>&1 &

    nohup sh bin/mqbroker -c conf/myconf/broker-b-s.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-b-s.log 2>&1 &

    192.168.8.114执行:

    nohup sh bin/mqbroker -c conf/myconf/broker-b.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-b.log 2>&1 &

    nohup sh bin/mqbroker -c conf/myconf/broker-a-s.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-a-s.log 2>&1 &

    7、查看是否启动

    可以查看启动日志
    使用jps  , 正常情况可以看到 2个broker和1个namesrv 
    常规查看进程方式:ps -ef|grep java   


     

    8、创建topic   和  订阅组

    updateTopic:该命令执行会在broker所在机器创建一个新的topic,若topic已存在,则会更新topic的属性

    sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

    创建topic时使用-b参数指定broker的地址,可以指定在哪个broker上创建。如果使用-c参数指定集群名称,则可以为该集群上的每一个broker都创建一份topic信息。建议使用-c指定集群名称,减少分别在不同的broker上创建手误导致topic属性不同的概率,如果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的topic信息

    使用该命令创建topic的读写队列数默认为8,可以通过-r -w指定topic的读写队列数。注意:该命令创建topic的默认队列数无法通过任何配置更改,除非修改源码。

    其它参数说明查看--help

    deleteTopic:从Broker和Name Server删除Topic

    sh ./bin/mqadmin   deleteTopic   -t   myTopic   -c   DefaultCluster   -n   "192.168.8.113:9876;192.168.8.114:9876"

    该命令执行完成后会将指定集群下的所有broker节点的topic信息删除,并清除指定地址的name server上该topic的路由信息。所以name server是一个集群的话,请指定集群地址,否则未指定的name server的topic路由信息可能经过broker一个心跳时间后清除。未被清除的那段时间内,生产者依然可以从name server上获取到topic路由信息,正常发送消息。但是发送过程中不会有异常,broker接收到消息处理的时候才会失败,并将结果响应给客户端。

    updateSubGroup:该命令执行会在broker所在机器创建一个新的订阅组,若订阅组已存在,则会更新订阅组的属性

    sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

    创建订阅组时使用-b参数指定broker的地址,可以指定在哪个broker上创建。如果使用-c参数指定集群名称,则可以为该集群上的每一个broker都创建一份新的订阅组信息。建议使用-c指定集群名称,减少分别在不同的broker上创建订阅组时手误导致每个机器上的订阅组属性不同的概率,如果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的订阅组信息

    不指定消费模型时,默认为集群消费

    创建该订阅组时并不会同时创建重试topic,但是该订阅组第一次订阅topic成功时,会创建一个重试topic

    其它参数说明使用--help查看


     

    deleteSubGroup:从Broker删除订阅组

    sh bin/mqadmin  deleteSubGroup  -g  customerGroup  -c  DefaultCluster  -n  "192.168.8.113:9876;192.168.8.114:9876"

    将指定订阅组从broker删除,同时将使用该订阅组名称创建的重试topic及死信topic信息统统清除

    9、RocketMQ--JavaAPI

    使用前创建Topic和订阅组

    sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

    sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

    sh bin/mqadmin updateSubGroup -c DefaultCluster -g producerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

    <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.5.2</version>
            </dependency>

    生产者:

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;

    public class SyncProducer {
    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    producer.setNamesrvAddr("192.168.8.113:9876;192.168.8.114:9876");
    producer.start();
    for (int i = 0; i < 100; i++) {
    Message msg = new Message("myTopic" ,"TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
    }
    producer.shutdown();
    }
    }
     

    消费者:


    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;

    public class SyncConsumer {

    public static void main(String[] args) throws MQClientException {

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("customerGroup");
    consumer.setNamesrvAddr("192.168.8.113:9876;192.168.8.114:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("myTopic", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for (MessageExt ext : msgs) {
    String body = new String(ext.getBody());
    System.out.println(body);
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.println("Consumer Started.");
    }
    }
     

    10、RocketMQ--控制台

    docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.204.62:9876;192.168.204.63:9876 -Drocketmq.config.isVIPChannel=false" -p 8294:8080 -t styletang/rocketmq-console-ng



     

  • 相关阅读:
    Mysql索引查询失效的情况
    常用的设计模式
    dubbo的实现原理
    HashMap和HashTable的区别
    SpringMVC工作原理的介绍
    SpringMVC 基础内容及使用步骤
    BeanFactory和ApplicationContext的区别+部分Spring的使用
    Spring常用的jar+普通构造注入
    如何在CentOS7上安装MySQL并实现远程访问
    如何搭建Spring MVC 框架---Hello World
  • 原文地址:https://www.cnblogs.com/360minitao/p/14842297.html
Copyright © 2011-2022 走看看