zoukankan      html  css  js  c++  java
  • RocketMQ集群搭建

    1. RocketMQ集群搭建

    1.1 各角色介绍

    • Producer:消息的发送者;举例:发信者
    • Consumer:消息接收者;举例:收信者
    • Broker:暂存和传输消息;举例:邮局
    • NameServer:管理Broker;举例:各个邮局的管理机构
    • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
    • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

    1.2 集群搭建方式

    1.2.1 集群特点

    • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

    • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

    • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

    • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    1.2.2 集群模式

    1)单Master模式

    这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

    2)多Master模式

    一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

    • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

    3)多Master多Slave模式(异步)

    每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

    4)多Master多Slave模式(同步)

    每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

    • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
    • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

    1.3 双主双从集群搭建

    1.3.1 总体架构

    消息高可用采用2m-2s(同步双写)方式

    1.3.2 集群工作流程

    1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
    3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
    4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
    5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

    1.3.3 服务器环境

    序号 IP 角色 架构模式
    1 192.168.1.144 nameserver、brokerserver Master1、Slave2
    2 192.168.1.145 nameserver、brokerserver Master2、Slave1

    1.3.4 Host添加信息

    vim /etc/hosts
    

    配置如下:

    # nameserver
    192.168.1.144 rocketmq-nameserver1
    192.168.1.145 rocketmq-nameserver2
    # broker
    192.168.1.144 rocketmq-master1
    192.168.1.144 rocketmq-slave2
    192.168.1.145 rocketmq-master2
    192.168.1.145 rocketmq-slave1
    

    配置完成后, 重启网卡

    systemctl restart network
    

    1.3.5 防火墙配置

    宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙

    # 关闭防火墙
    systemctl stop firewalld.service 
    # 查看防火墙的状态
    firewall-cmd --state 
    # 禁止firewall开机启动
    systemctl disable firewalld.service
    

    或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:

    • nameserver 默认使用 9876 端口
    • master 默认使用 10911 端口
    • slave 默认使用11011 端口

    执行以下命令:

    # 开放name server默认端口
    firewall-cmd --remove-port=9876/tcp --permanent
    # 开放master默认端口
    firewall-cmd --remove-port=10911/tcp --permanent
    # 开放slave默认端口 (当前集群模式可不开启)
    firewall-cmd --remove-port=11011/tcp --permanent 
    # 重启防火墙
    firewall-cmd --reload
    

    1.3.6 环境变量配置

    vim /etc/profile
    

    在profile文件的末尾加入如下命令

    #set rocketmq
    ROCKETMQ_HOME=/root/rocketmq-all-4.4.0-bin-release
    PATH=$PATH:$ROCKETMQ_HOME/bin
    export ROCKETMQ_HOME PATH
    

    输入:wq! 保存并退出, 并使得配置立刻生效:

    source /etc/profile
    

    1.3.7 创建消息存储路径

    mkdir -p /usr/local/rocketmq/store
    mkdir -p /usr/local/rocketmq/store/commitlog
    mkdir -p /usr/local/rocketmq/store/consumequeue
    mkdir -p /usr/local/rocketmq/store/index
    mkdir -p /usr/local/rocketmq/store1
    mkdir -p /usr/local/rocketmq/store1/commitlog
    mkdir -p /usr/local/rocketmq/store1/consumequeue
    mkdir -p /usr/local/rocketmq/store1/index
    

    1.3.8 broker配置文件

    1)master1

    服务器:192.168.1.144

    vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
    

    修改配置如下:

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    2)slave2

    服务器:192.168.1.144

    vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
    

    修改配置如下:

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store1
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store1/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store1/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store1/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store1/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store1/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    3)master2

    服务器:192.168.1.145

    vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
    

    修改配置如下:

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    4)slave1

    服务器:192.168.1.145

    vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
    

    修改配置如下:

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store1
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store1/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store1/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store1/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store1/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store1/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    1.3.9 修改启动脚本文件

    1)runbroker.sh

    vi /usr/local/rocketmq/bin/runbroker.sh
    

    需要根据内存大小进行适当的对JVM参数进行调整:

    #===================================================
    # 开发环境配置 JVM Configuration
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    

    2)runserver.sh

    vim /usr/local/rocketmq/bin/runserver.sh
    
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    1.3.10 服务启动

    1)启动NameServe集群

    分别在192.168.1.144和192.168.1.145启动NameServer

    cd /usr/local/rocketmq/bin
    nohup sh mqnamesrv &
    

    2)启动Broker集群

    • 在192.168.1.144上启动master1和slave2

    master1:

    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties &
    

    slave2:

    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
    
    • 在192.168.1.145上启动master2和slave2

    master2

    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties &
    

    slave1

    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -c /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
    

    1.3.11 查看进程状态

    启动后通过JPS查看启动进程

    image-20201130173135612

    1.3.12 查看日志

    # 查看nameServer日志
    tail -500f ~/logs/rocketmqlogs/namesrv.log
    # 查看broker日志
    tail -500f ~/logs/rocketmqlogs/broker.log
    

    1.4 mqadmin管理工具

    1.4.1 使用方式

    进入RocketMQ安装位置,在bin目录下执行./mqadmin {command} {args}

    1.4.2 命令介绍

    1)Topic相关

    名称 含义 命令选项 说明
    updateTopic 创建更新Topic配置 -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
    -c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询)
    -h- 打印帮助
    -n NameServer服务地址,格式 ip:port
    -p 指定新topic的读写权限( W=2|R=4|WR=6 )
    -r 可读队列数(默认为 8)
    -w 可写队列数(默认为 8)
    -t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
    deleteTopic 删除Topic -c cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询)
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
    topicList 查看 Topic 列表信息 -h 打印帮助
    -c 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数
    -n NameServer 服务地址,格式 ip:port
    topicRoute 查看 Topic 路由信息 -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    topicStatus 查看 Topic 消息队列offset -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    topicClusterList 查看 Topic 所在集群列表 -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    updateTopicPerm 更新 Topic 读写权限 -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
    -p 指定新 topic 的读写权限( W=2|R=4|WR=6 )
    -c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令
    updateOrderConf 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic,键
    -v orderConf,值
    -m method,可选get、put、delete
    allocateMQ 以平均负载算法计算消费者列表负载消息队列的负载结果 -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -i ipList,用逗号分隔,计算这些ip去负载Topic的消息队列
    statsAll 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -a 是否只打印活跃topic
    -t 指定topic
    #### 2)集群相关
    名称 含义 命令选项 说明
    clusterList 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 -m 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -i 打印间隔,单位秒
    clusterRT 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 -a amount,每次探测的总数,RT = 总时间 / amount
    -s 消息大小,单位B
    -c 探测哪个集群
    -p 是否打印格式化日志,以|分割,默认不打印
    -h 打印帮助
    -m 所属机房,打印使用
    -i 发送间隔,单位秒
    -n NameServer 服务地址,格式 ip:port

    3)Broker相关

    名称 含义 命令选项 说明
    updateBrokerConfig 更新 Broker 配置文件,会修改Broker.conf -b Broker 地址,格式为ip:port
    -c cluster 名称
    -k key 值
    -v value 值
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    brokerStatus 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) -b Broker 地址,地址为ip:port
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    brokerConsumeStats Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 -b Broker 地址,地址为ip:port
    -t 请求超时时间
    -l diff阈值,超过阈值才打印
    -o 是否为顺序topic,一般为false
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    getBrokerConfig 获取Broker配置 -b Broker 地址,地址为ip:port
    -n NameServer 服务地址,格式 ip:port
    wipeWritePerm 从NameServer上清除 Broker写权限 -b Broker 地址,地址为ip:port
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    cleanExpiredCQ 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -b Broker 地址,地址为ip:port
    -c 集群名称
    cleanUnusedTopic 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -b Broker 地址,地址为ip:port
    -c 集群名称
    sendMsgStatus 向Broker发消息,返回发送状态和RT -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -b BrokerName,注意不同于Broker地址
    -s 消息大小,单位B
    -c 发送次数
    #### 4)消息相关
    名称 含义 命令选项 说明
    queryMsgById 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 -i msgId
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    queryMsgByKey 根据消息 Key 查询消息 -k msgKey
    -t Topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    queryMsgByOffset 根据 Offset 查询消息 -b Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
    -i query 队列 id
    -o offset 值
    -t topic 名称
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    queryMsgByUniqueKey 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -i uniqe msg id
    -g consumerGroup
    -d clientId
    -t topic名称
    checkMsgSendRT 检测向topic发消息的RT,功能类似clusterRT -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic名称
    -a 探测次数
    -s 消息大小
    sendMessage 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic名称
    -p body,消息体
    -k keys
    -c tags
    -b BrokerName
    -i queueId
    consumeMessage 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic名称
    -b BrokerName
    -o 从offset开始消费
    -i queueId
    -g 消费者分组
    -s 开始时间戳,格式详见-h
    -d 结束时间戳
    -c 消费多少条消息
    printMsg 从Broker消费消息并打印,可选时间段 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic名称
    -c 字符集,例如UTF-8
    -s subExpress,过滤表达式
    -b 开始时间戳,格式参见-h
    -e 结束时间戳
    -d 是否打印消息体
    printMsgByQueue 类似printMsg,但指定Message Queue -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -t topic名称
    -i queueId
    -a BrokerName
    -c 字符集,例如UTF-8
    -s subExpress,过滤表达式
    -b 开始时间戳,格式参见-h
    -e 结束时间戳
    -p 是否打印消息
    -d 是否打印消息体
    -f 是否统计tag数量并打印
    resetOffsetByTime 按时间戳重置offset,Broker和consumer都会重置 -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -g 消费者分组
    -t topic名称
    -s 重置为此时间戳对应的offset
    -f 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系
    -c 是否重置c++客户端offset

    5)消费者、消费组相关

    名称 含义 命令选项 说明
    consumerProgress 查看订阅组消费状态,可以查看具体的client IP的消息积累量 -g 消费者所属组名
    -s 是否打印client IP
    -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    consumerStatus 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand -h 打印帮助
    -n NameServer 服务地址,格式 ip:port
    -g consumer group
    -i clientId
    -s 是否执行jstack
    getConsumerStatus 获取 Consumer 消费进度 -g 消费者所属组名
    -t 查询主题
    -i Consumer 客户端 ip
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    updateSubGroup 更新或创建订阅关系 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -b Broker地址
    -c 集群名称
    -g 消费者分组名称
    -s 分组是否允许消费
    -m 是否从最小offset开始消费
    -d 是否是广播模式
    -q 重试队列数量
    -r 最大重试次数
    -i 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费
    -w 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1
    -a 当消费者数量变化时是否通知其他消费者负载均衡
    deleteSubGroup 从Broker删除订阅关系 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -b Broker地址
    -c 集群名称
    -g 消费者分组名称
    cloneGroupOffset 在目标群组中使用源群组的offset -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -s 源消费者组
    -d 目标消费者组
    -t topic名称
    -o 暂未使用

    6)连接相关

    名称 含义 命令选项 说明
    consumerConnec tion 查询 Consumer 的网络连接 -g 消费者所属组名
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    producerConnec tion 查询 Producer 的网络连接 -g 生产者所属组名
    -t 主题名称
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助

    7)NameServer相关

    名称 含义 命令选项 说明
    updateKvConfig 更新NameServer的kv配置,目前还未使用 -s 命名空间
    -k key
    -v value
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    deleteKvConfig 删除NameServer的kv配置 -s 命名空间
    -k key
    -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    getNamesrvConfig 获取NameServer配置 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    updateNamesrvConfig 修改NameServer配置 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    -k key
    -v value

    8)其他

    名称 含义 命令选项 说明
    startMonitoring 开启监控进程,监控消息误删、重试队列消息数等 -n NameServer 服务地址,格式 ip:port
    -h 打印帮助
    ### 1.4.3 注意事项
    • 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
    • 几乎所有命令都可以通过-h获取帮助
    • 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令

    1.5 集群监控平台搭建

    1.5.1 概述

    RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console进行编译打包运行。

    image-20201130183452949

    1.5.2 下载并编译打包

    git clone https://github.com/apache/rocketmq-externals
    cd rocketmq-console
    mvn clean package -Dmaven.test.skip=true
    

    注意:打包前在rocketmq-console中配置namesrv集群地址:

    rocketmq.config.namesrvAddr=192.168.1.144:9876;192.168.1.145:9876
    

    启动rocketmq-console:

    java -jar rocketmq-console-ng-1.0.0.jar
    

    启动成功后,我们就可以通过浏览器访问http://192.168.1.144:8080进入控制台界面了,如下图:

    image-20201130183119374

    集群状态:

    image-20201130183144378

  • 相关阅读:
    Entity Framework Core 2.0 新特性
    asp.net core部署时自定义监听端口,提高部署的灵活性
    asp.net core使用jexus部署在linux无法正确 获取远程ip的解决办法
    使用xshell连接服务器,数字键盘无法使用解决办法
    使用Jexus 5.8.2在Centos下部署运行Asp.net core
    【DevOps】DevOps成功的八大炫酷工具
    【Network】Calico, Flannel, Weave and Docker Overlay Network 各种网络模型之间的区别
    【Network】UDP 大包怎么发? MTU怎么设置?
    【Network】高性能 UDP 应该怎么做?
    【Network】golang 容器项目 flannel/UDP相关资料
  • 原文地址:https://www.cnblogs.com/dalianpai/p/14062867.html
Copyright © 2011-2022 走看看