zoukankan      html  css  js  c++  java
  • RocketMQ之双Master方式部署以及简单使用

    1.1、服务器环境

    192.168.100.24 root nameServer1,brokerServer1 Master1

    192.168.100.25 root nameServer2,brokerServer2 Master2

    1.2、Hosts添加信息

    192.168.100.24 rocketmq-nameserver1

    192.168.100.24 rocketmq-master1

    192.168.100.25 rocketmq-nameserver2

    192.168.100.25 rocketmq-master2

    1.3、上传解压【两台机器】

    # 上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/local

    # tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local

    # mv alibaba-rocketmq alibaba-rocketmq-3.2.6

    # ln -s alibaba-rocketmq-3.2.6 rocketmq

    1.4、创建存储路径【两台机器】

    # mkdir /usr/local/rocketmq/store

    # mkdir /usr/local/rocketmq/store/commitlog

    # mkdir /usr/local/rocketmq/store/consumequeue

    # mkdir /usr/local/rocketmq/store/index

    1.5、RocketMQ配置文件【两台机器】

    # vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties 

    # vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

    #所属集群名字  
    brokerClusterName=rocketmq-cluster  
    #broker名字,注意此处不同的配置文件填写的不一样  
    brokerName=broker-a|broker-b  
    #0 表示 Master,>0 表示 Slave  
    brokerId=0  
    #nameServer地址,分号分割  
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876  
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数  
    defaultTopicQueueNums=4  
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭  
    autoCreateTopicEnable=true  
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭  
    autoCreateSubscriptionGroup=true  
    #Broker 对外服务的监听端口  
    listenPort=10911  
    #删除文件时间点,默认凌晨 4点  
    deleteWhen=04  
    #文件保留时间,默认 48 小时  
    fileReservedTime=120  
    #commitLog每个文件的大小默认1G  
    mapedFileSizeCommitLog=1073741824  
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整  
    mapedFileSizeConsumeQueue=300000  
    #destroyMapedFileIntervalForcibly=120000  
    #redeleteHangedFileInterval=120000  
    #检测物理文件磁盘空间  
    diskMaxUsedSpaceRatio=88  
    #存储路径  
    storePathRootDir=/usr/local/rocketmq/store  
    #commitLog 存储路径  
    storePathCommitLog=/usr/local/rocketmq/store/commitlog  
    #消费队列存储路径存储路径  
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue  
    #消息索引存储路径  
    storePathIndex=/usr/local/rocketmq/store/index  
    #checkpoint 文件存储路径  
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint  
    #abort 文件存储路径  
    abortFile=/usr/local/rocketmq/store/abort  
    #限制的消息大小  
    maxMessageSize=65536  
    #flushCommitLogLeastPages=4  
    #flushConsumeQueueLeastPages=2  
    #flushCommitLogThoroughInterval=10000  
    #flushConsumeQueueThoroughInterval=60000  
    #Broker 的角色  
    #- ASYNC_MASTER 异步复制Master  
    #- SYNC_MASTER 同步双写Master  
    #- SLAVE  
    brokerRole=ASYNC_MASTER  
    #刷盘方式  
    #- ASYNC_FLUSH 异步刷盘  
    #- SYNC_FLUSH 同步刷盘  
    flushDiskType=ASYNC_FLUSH  
    #checkTransactionMessageEnable=false  
    #发消息线程池数量  
    #sendMessageThreadPoolNums=128  
    #拉消息线程池数量  
    #pullMessageThreadPoolNums=128  

    1.6、修改日志配置文件【两台机器】

    # mkdir -p /usr/local/rocketmq/logs 

    # cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g'*.xml

    1.7、修改启动脚本参数【两台机器】

    # vim /usr/local/rocketmq/bin/runbroker.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

    # vim /usr/local/rocketmq/bin/runserver.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

    1.8、启动NameServer【两台机器】

    # cd /usr/local/rocketmq/bin

    # nohup sh mqnamesrv &

    对应的关闭命令是:sh mqshutdown namesrv

    1.9、启动BrokerServer A【192.168.100.24】

    # cd /usr/local/rocketmq/bin 

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &# netstat -ntlp

    # jps

    # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log

    # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

    1.10、启动BrokerServer B【192.168.100.25】

    # cd /usr/local/rocketmq/bin

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &

    # netstat -ntlp

    # jps

    # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log

    # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

    对应的关闭Broker的命令是:sh mqshutdown broker

    1.11、部署RocketMQ Console(没有的可以联系我)

    1.12、数据清理

    # cd /usr/local/rocketmq/bin

    # sh mqshutdown broker

    # sh mqshutdown namesrv

    # --等待停止# rm -rf /usr/local/rocketmq/store

    # mkdir /usr/local/rocketmq/store

    # mkdir /usr/local/rocketmq/store/commitlog

    # mkdir /usr/local/rocketmq/store/consumequeue

    # mkdir /usr/local/rocketmq/store/index

    # --按照上面步骤重启NameServer与BrokerServer

    二、简单示例:

    所需jar包(Maven):

    <dependency>  
          <groupId>com.alibaba.rocketmq</groupId>  
          <artifactId>rocketmq-client</artifactId>  
          <version>3.2.6</version>  
        </dependency>  
        <dependency>  
          <groupId>com.alibaba.rocketmq</groupId>  
          <artifactId>rocketmq-common</artifactId>  
          <version>3.2.6</version>  
        </dependency>  
        <dependency>  
          <groupId>com.alibaba.rocketmq</groupId>  
          <artifactId>rocketmq-remoting</artifactId>  
          <version>3.2.6</version>  
        </dependency>  

    Producer类:

    public class producer {  
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {  
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");  
            producer.setNamesrvAddr("192.168.100.24:9876;192.168.100.25:9876");  
            producer.start();  
            for (int i = 0;i<100;i++){  
                Message msg = new Message("TopicQuickStart","TagA",  
                        ("Hello RocketMQ" + i).getBytes());  
                SendResult sendResult = producer.send(msg);  
                System.out.println(sendResult);  
      
            }  
        }  
    }  

    Consumer类:

    public class Consumer {  
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");  
            consumer.setNamesrvAddr("192.168.100.24:9876;192.168.100.25:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
      
            consumer.subscribe("TopicQuickStart","*");  
      
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
                @Override  
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {  
                    System.out.println(Thread.currentThread().getName() + "Receive New Messages:" + list);  
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
                }  
            });  
      
            consumer.start();  
      
            System.out.println("Consumer Started");  
        }  
      
    }  

    三、查看RocketMQ控制台结果:

    四、对于2m-2s-async模式的broker启动命令:

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties >/dev/null 2>&1 &

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

    # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &

  • 相关阅读:
    前端与后端的一些论述
    Pandas Cheat Sheet
    flask+socketio+echarts3 服务器监控程序(基于后端数据推送)
    【Java并发编程实战】-----“J.U.C”:ReentrantLock之一简介
    【Java并发编程实战】-----“J.U.C”:锁,lock
    【Java并发编程实战】-----synchronized
    【java并发编程实战】-----线程基本概念
    2015年读书系统
    阮一峰文集:《一个寻找作者的读者》
    进一步封装highchart,打造自己的图表插件:jHighChart.js
  • 原文地址:https://www.cnblogs.com/WangHaiMing/p/7565181.html
Copyright © 2011-2022 走看看