一、架构简述
RocketMQ阿里开源的一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性, 万亿级容量和灵活的可伸缩性。跟其它中间件相比,RocketMQ的特点是纯JAVA实现,在发生宕机和其它故障时消息丢失率更低。
它由四个部分组成:nameserver,broker,生产者和消费者。它们中的每一个都可以水平扩展,而没有单个故障点。
Nameserver:提供轻量级的服务发现和路由。生产者和消费者通过nameserver获取broker信息。它几乎是无状态的,nameserver结点之间没有任何的数据同步,broker注册信息时会注册到每一个nameserver结点上面,所以每个nameserver节点都记录了完整broker信息,提供相应的读写服务,并支持快速的存储扩展。
Broker:通过提供轻量级的topic和queue机制来存储消息。与nameserver中的每个节点建立长连接,定时注册topic等信息到nameserver上面。broker一般都是主从模式,因为消息是真实存储消息的地方,避免一个结点挂了,导致这个节点数据全部丢失。
Producer:与nameserver集群中的一个结点建立长连接,定期的拉取broker 的topic路由信息,再将消息发送到对应broker的topic上面
Consumer:与nameserver集群中的一个结点建立长连接,定期的拉取broker 的topic路由信息,再去消费对应broker的topic信息
二、环境搭建
1.官网下载:http://rocketmq.apache.org/release_notes/release-notes-4.7.0/
2.解压 unzip rocketmq-all-4.7.0-bin-release.zip
3.修改启动参数配置。默认的jvm参数内存设置特别大,如果自己机器不行的话需要手动改下bin目录下的启动参数文件:runbroker.sh 和runserver.sh文件 我的虚拟机内存分配不大,改成256m 256m 128m
这是默认的
4.启动nameserver: nohup sh mqnamesrv ‐n 192.168.0.67:9876 & (将日志输出当前目录的nohub.out文件,方便查看启动日志,ip是当前机器的ip)
5.启动broker:nohup sh mqbroker ‐n 192.168.0.67:9876 autoCreateTopicEnable=true & (autoCreateTopicEnable=true 自动创建topic,如果不设置true的话,生产者发送消息的时候如果没有topic就会发送失败,需要提前把topic创建好,设置true会在发送时自动创建topic,192.168.0.67:9876 是name server)
也可以使用配置文件启动broker:nohup sh mqbroker ‐n 192.168.0.67:9876 ‐c conf/broker.conf &
简单看下默认的配置文件中的一些参数:
#集群名字
brokerClusterName = DefaultCluster
#broker名字,集群中主从都要用这个名字,才会组成一个集群
brokerName = broker-a
#id为0的是master 非0的slava
brokerId = 0
#消息处理时间,凌晨4点
deleteWhen = 04
#消息保存时间默认48小时,48小时之后的凌晨4点就会清理
fileReservedTime = 48
#集群主从之间数据同步方式
#异步只需要发到master成功就返回客户端段成功,性能高,但是如果master挂了 slave还未同步就会丢失消息。根据自身业务场景选择合适方式
brokerRole = ASYNC_MASTER
#消息刷盘机制,和主从数据同步类似,同步就是说需要写进磁盘了才返回成功。异步就是写进内存了就返回成功,后面再去落盘。
flushDiskType = ASYNC_FLUSH
#自动创建topic
autoCreateTopicEnable=true
使用配置文件启动:nohup sh bin/mqbroker ‐n 192.168.0.67:9876 -c conf/broker.conf &
broker 192.168.0.67:10911关联的nameserver是192.168.0.67:9876
至此一个单机的rocketMQ的环境就搭建好了 正常退出: sh mqshutdown broker 和 sh mqshutdown namesrv
测试下消息发送,使用rocketMQ提供的测试脚本:
export NAMESRV_ADDR=192.168.0.67:9876
生产者脚本
消费消息:
三、控制台搭建
1.下载rocketMQ的扩展包,master分支:https://github.com/apache/rocketmq-external
2.启动rocketmq-console-ng模块
3.修改此模块配置:
3.1 maven依赖rocketMQ版本改成自己部署版本对应的,我部署的MQ是最新的4.7.0版本
3.2 配置文件中配置namserver地址和控制台数据存放地址
正常来说改完这两个地方就可以直接启动控制台的这个springboot程序了。
但是因为我用的MQ是最新的4.7版本,控制台对应的还没有更新到最新的。编译都有会报错的地方
1.DefaultMQPullConsumer这个类已经不推荐使用了,并且4.7.0中有两个类似的构造器,原来代码直接传了一个null,第二个参数无法识别是哪个构造器的。修改下把第二参数强转String或者RPCHook
2.MQAdminExt这个接口中加了新方法,但是控制台中MQAdminExtImpl还没有实现对应的方法。这个问题在github上几天前已经有人提了Issues了,我这里是自己添加一下默认实现然后服务就可以正常启动了,还不确定后续后面有什么影响没有,至少可以启动了
之前使用测试脚本发送的消息 以及topic都可以在控制台看到了