一、修改 hosts
我是准备了两台虚拟机,所以分别在两台修改了hosts,用如下命令修改hosts
vim /etc/hosts
分别在两台虚拟机上配置如下配置,ip是你自己虚拟机的ip
192.168.32.128 rocketmq-n1 192.168.32.129 rocketmq-n2
修改完后刷新配置
systemctl restart network
二、配置文件broker-a.properties(128)
之前在128上面配置单机版本了,进入他的conf文件夹,在conf文件夹中有,里面有主从配置,里面提供了三种主从的配置方式,分别是2主2从异步刷盘、2主双写、2主2从的同步刷盘;
因为我今天想布的是2主2从同步刷盘,所以先进入如下文件夹进行修改broker-a.properties文件
修改配置如下
brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
三、配置文件broker-b-s.properties(128)
brokerClusterName=rocketmq-cluster brokerName=broker-b brokerId=1 namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=11011 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 diskMaxUsedSpaceRatio=88 storePathRootDir=/data/rocketmq/store-s
storePathCommitLog=/data/rocketmq/store-s/commitlog
storePathConsumeQueue=/data/rocketmq/store-s/consumequeue
storePathIndex=/data/rocketmq/store-s/index
storeCheckpoint=/data/rocketmq/store-s/checkpoint
abortFile=/data/rocketmq/store-s/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
四、配置文件broker-b.properties (129)
brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
五、配置文件broker-a-s.properties(129)
brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=rocketmq-n1:9876;rocketmq-n2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store-s #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store-s/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store-s/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store-s/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store-s/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store-s/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
六、启动集群
创建目录
# 分别在两台虚拟机上执行 128服务器 mkdir /data/rocketmq/store -p mkdir /data/rocketmq/store/commitlog -p mkdir /data/rocketmq/store/consumequeue -p mkdir /data/rocketmq/store/index -p # 129服务器 mkdir /data/rocketmq/store-s -p mkdir /data/rocketmq/store-s/commitlog -p mkdir /data/rocketmq/store-s/consumequeue -p mkdir /data/rocketmq/store-s/index -p
停止broker namesrv(因为我之前开启过所以要停止)
cd /usr/local/rocketmq/conf/2m-2s-sync sh /usr/local/rocketmq/bin/mqshutdown broker
#用lsof -i:10911看是否停成功 sh /usr/local/rocketmq/bin/mqshutdown namesrv
#用lsof -i:9876验证是否成功
在两个服务器的 bin目录下启动mq
nohup /usr/local/rocketmq/bin/mqnamesrv &
用tail -f nohup.out看是否成功;
编写配置文件,并写好配置在2m-2s-sync文件夹下(128)
echo "brokerIP1=192.168.32.128" >> broker-a.properties
echo "brokerIP1=192.168.32.128" >> broker-b-s.properties
同理在129上也配置下
echo "brokerIP1=192.168.32.129" >> broker-b.properties
echo "brokerIP1=192.168.32.129" >> broker-a-s.properties
在 128上执行(在2m目录)
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
用命令看下是否成功
tail -f nohup.out
在 129 上执行
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
验证启动是否成功在单机中说明过,这里就不说明了,查看集群信息
/usr/local/rocketmq/bin/mqadmin clusterlist -n 192.168.32.128:9876;192.168.32.129:9876
关闭
sh /usr/local/rocketmq/bin/mqshutdown broker
sh /usr/local/rocketmq/bin/mqshutdown namesrv
创建集群topic(在bin目录下),这是按官网例子写的,结果报错不行
sh mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876;192.168.32.129:9876 -b 192.168.32.128:10911
于是换了个写法
./mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876 -c rocketmq-cluster
启动代码
public class ClusterConsumer { public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr("192.168.32.128:9876;92.168.32.129:9876"); // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 4. 注册消息接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { MessageExt messageExt = list.get(0); System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } }
public class ClusterProducer { public static void main(String[] args) throws MQClientException, IOException, RemotingException, InterruptedException, MQBrokerException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.32.128:9876;92.168.32.129:9876"); // 3. 启动生产者 producer.start(); // 4. 生产者发送消息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); } System.in.read(); // 5. 停止生产者 producer.shutdown(); } }
七、rocketmq web管理界面
git clone源码
git clone https://github.com/apache/rocketmq-externals
进入到 rocketmq-console 目录,执行 maven 编译打包
cd rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=true
启动
# jar包在target目录下面,你可以copy到其他目录去运行
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8081 -- rocketmq.config.namesrvAddr=192.168.32.128:9876 > /usr/local/rocketmq/logs/mq- console.log 2>&1 &
# --server.port springboot内置tomcat的端口号,默认8080;
# --rocketmq.config.namesrvAddr nameserver的地址