参考书籍: RocketMQ实战与原理解析: 作者杨开元
一 单机部署
亲测jdk11有问题,无法启动,最好用jdk8
修改shell脚本:
修改合适的内存:
先启动NameServer再启动一个Broker:
日志默认位置: ~/logs/rocketmqlogs
测试:
发送demo: sh tools.sh org.apache.rocketmq.example.quickstart.Producer
接受demo: sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意:每个窗口都必须先设置NAMESRV_ADDR,否则无法连接: export NAMESRV_ADDR=localhost:9876
关闭
sh mqshutdown namesrv
sh mqshutdown broker
二 集群部署
先部署两台服务器到两个服务器上
结构:
服务器a,服务器b, 两个NameServer, 两个brokera,b,两台服务器互为主从,即服务器a上面有brokera和brokerb的salve
修改配置文件
位置: rocketmq-all-4.7.0/conf/2m-2s-async
一台修改broker-a.properties broker-b-s.properties,即brokera的master和brokerb的salve,另一台反之:
# NemeServer地址,可以有多个
namesrvAddr=192.168.159.128:9876;192.168.159.129:9876
# 集群名称
brokerClusterName=DefaultCluster
# broker名称,主从的名称相同表明相互关联,为同一个broker
brokerName=broker-b
# broker的id,一个master可以有多个slave,id为0表示主,大于0表示不同的从
brokerId=0
# 在几点做删除消息动作,比如04: 凌晨四点
deleteWhen=04
# 在磁盘保存消息的时长,超过会自动删除,在上面配置删除的时间点删除
fileReservedTime=48
# 角色有三种:
#ASYNC_MASTER, 消息同步开始就返回成功状态
#SYNC_MASTER 表示消息同步完成后再发送完成状态
#SLAVE 从
brokerRole=ASYNC_MASTER
# 刷盘策略,同步和异步
flushDiskType=ASYNC_FLUSH
# 监听端口,一台机器上多个Broker要确保端口不冲突
listenPort=10921
# 存储消息以及一些配置信息的根目录
storePathRootDir=/home/rocketmq/store-b
启动
先启动两个NameServer
再启动四个Broker,后面用 -c 跟上对应的配置文件:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
注意: 端口一定要提前放开
三 控制台搭建
下载解压
wget https://github.com/apache/rocketmq-externals/archive/master.zip
解压 unzip
编译打包
之后进入目录: cd rocketmq-externals-master/rocketmq-console
使用maven编译打包:
mvn clean package
注意: 如果报错就删除测试代码:
rm src/test -rf #删除src 目录下的 test
如果慢就编辑maven的setting,使用阿里云的镜像地址
运行
编译后找到target下面的jar包,其实是一个springboot项目,启动
nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=8561 --rocketmq.config.namesrvAddr=a:9876;b:9876 &
nohup 命令 & :后台启动,防止session或者退出,打断程序运行
成功运行:
四 springboot整合
添加start依赖
=implementation group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.0.4'
yml配置
rocketmq:
name-server: 192.168.159.128:9876;192.168.159.129:9876
producer:
group: houzheng
send-message-timeout: 30000
producer
// springboot会自动注入,可直接使用
@Autowired
RocketMQTemplate rocketMQTemplate;
@GetMapping("send/{id}")
public String send(@PathVariable("id") String id){
rocketMQTemplate.send("test-topic-01",MessageBuilder.withPayload(new User(id,"侯征")).build());
return "SUCESS";
}
consumer
@Component
@RocketMQMessageListener(topic = "test-topic-01", consumerGroup = "my-consumer_test-topic-01")
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.println("接受到消息:"+message.toString());
}
}
测试