作者:黄理
黄理,10多年软件开发和架构经验,热衷于代码和性能优化,开发和参与过多个开源项目。曾在淘宝任业务架构师多年,当前在快手负责在线消息系统建设工作。
为什么建设在线消息系统
在引入RocketMQ之前,快手已经在大量的使用Kafka了,但并非所有情况下Kafka都是最合适的,比如以下场景:
- 业务希望个别消费失败以后可以重试,并且不堵塞后续其它消息的消费。
- 业务希望消息可以延迟一段时间再投递。
- 业务需要发送的时候保证数据库操作和消息发送是一致的(也就是事务发送)。
- 为了排查问题,有的时候业务需要一定的单个消息查询能力。
为了应对以上这类场景,我们需要建设一个主要面向在线业务的消息系统,作为Kafka的补充。在考察的一些消息中间件中,RocketMQ和业务需求匹配度比较高,同时部署结构简单,使用的公司也比较多,于是最后我们就采用了RocketMQ。
部署模式和落地策略
在一个已有的体系内落地一个开软软件,通常大概有两种方式:
方式一,在开源软件的基础上做深度修改,很容易实现公司内需要的定制功能。但和社区开源版本分道扬镳,以后如何升级?
方式二,尽量不修改社区版本(或减少不兼容的修改),而是在它的外围或者上层进一步包装来实现公司内部需要的定制功能。
注:上图方式一的图画的比较极端,实际上很多公司是方式一、方式二结合的。
我们选择了方式二。最早的时候,我们使用的是4.5.2版本,后来社区4.7版本大幅减小了同步复制的延迟,正好我们的部署模式就是同步复制,于是就很轻松的升级了4.7系列,享受了新版本的红利。
在部署集群的时候,还会面临很多部署策略的选择:
• 大集群 vs 小集群
• 选择副本数
• 同步刷盘 vs 异步刷盘
• 同步复制 vs 异步复制
• SSD vs 机械硬盘
大集群会有更好的性能弹性,而小集群具有更好的隔离型,此外小集群可以不需要跨可用区/IDC部署,所以会有更好的健壮性,我们非常看重稳定性,因此选择了小集群。集群同步复制异步刷盘,首选SSD。
客户端封装策略
如上所述,我们没有在Rocketmq里面做深度修改,所以需要提供一个SDK来提供公司内的需要的定制功能,这个SDK大概是这样的:
对外只提供最基本的API,所有访问必须经过我们提供的接口。简洁的API就像冰山的一个角,除了对外的简单接口,下面所有的东西都可以升级更换,而不会破坏兼容性。
业务开发起来也很简单,只要需要提供Topic(全局唯一)和Group就可以生产和消费,不用提供环境、Name Server地址等。SDK内部会根据Topic解析出集群Name Server的地址,然后连接相应的集群。生产环境和测试环境环境会解析出不同的地址,从而实现了隔离。
上图分为3层,第二层是通用的,第三层才对应具体的MQ实现,因此,理论上可以更换为其它消息中间件,而客户端程序不需要修改。
SDK内部集成了热变更机制,可以在不重启client的情况下做动态配置,比如下发路由策略(更换集群name server的地址,或者连接到别的集群去),Client的线程数、超时时间等。通过maven强制更新机制,可以保证业务使用的SDK基本上是最新的。
集群负载均衡 & 机房灾备
所有的Topic默认都分配到两个可用区,生产者和消费者会同时连接至少两个独立集群(分布在不同的可用区),如下图:
生产者同时连接两个集群,如果可用区A出现故障,流量就会自动切换到可用区B的集群2去。我们开发了一个小组件来实现自适应的集群负载均衡,它包含以下能力:
• 千万级OPS
• 灵活的权重调整策略
• 健康检查支持/事件通知
• 并发度控制(自动降低响应慢的服务器的请求数)
• 资源优先级(类似Envoy,实现本地机房优先,或是被调服务器很多的时候选取一个子集来调用)
• 自动优先级管理
• 增量热变更
实际上它并不仅仅用于消息生产者,而是一个通用的主调方负载均衡类库,可以在github上找到:
https://github.com/PhantomThief/simple-failover-java
核心的SimpleFailover接口和PriorityFailover类没有传递第三方依赖,非常容易整合。
多样的消息功能
延迟消息
延迟消息是非常重要的业务功能,不过RocketMQ内置的延迟消息只能支持几个固定的延迟级别,所以我们又开发了单独的Delay Server来调度延迟消息:
上图这个结构没有直接将延迟消息发到Delay Server,而是更换Topic以后存入RocketMQ。这样的好处是可以复用现有的消息发送接口(以及上面的所有扩展能力)。对业务来说,只需要在构造消息的时候额外指定一个延迟时间字段即可,其它用法都不变。
事务消息
RocketMQ 4.3版本以后支持了事务消息,可以保证本地事务和消费发送同时成功或者失败,对于一些业务场景很有帮助。事务消息的用法和原理有很多资料,这里就不细述了。但关于事务消息的实践网上资料较少,我们可以给出一些建议。
首先,事务消息功能一直在不断完善,应该使用最新的版本,至少是4.6.1以后的版本,可以避免很多问题。
其次,事务消息性能是不如普通消息的,它在内部实际上会生成3个消息(一阶段1个,二阶段2个),所以性能大约只有普通消息的1/3,如果事务消息量大的话,要做好容量规划。回查调度线程也只有1个,不要用极限压力去考验它。
最后有一些参数注意事项。在broker的配置中:
- transientStorePoolEnable这个参数必须保持默认值false,否则会有严重的问题。
- endTransactionThreadPoolNums是事务消息二阶段处理线程大小,sendMessageThreadPoolNums则指定一阶段处理线程池大小。如果二阶段的处理速度跟不上一阶段,就会造成二阶段消息丢失导致大量回查,所以建议endTransactionThreadPoolNums应该大于sendMessageThreadPoolNums,建议至少4倍。
- useReentrantLockWhenPutMessage设置为true(默认值是false),以免线程抢锁出现严重的不公平,导致二阶段处理线程长时间抢不到锁。
- transactionTimeOut默认值6秒太短了,如果事务执行时间超过6秒,就可能导致消息丢失。建议改到1分钟左右。
生产者client也有一个注意事项,如果有多组broker,并且是2副本(有1个Slave),应该打开retryAnotherBrokerWhenNotStoreOK,以免某个Slave出现故障以后,大量消息发送失败。
分布式对账监控
除了比较一些常规的监控手段以外,我们开发了一个监控程序做分布式对账。可以发现我们的集群以及我们提供的SDK是否有异常。
具体做法是在每个Broker上都建立一个监控专用的Topic,监控程序使用我们自己提供的SDK框架来连接集群(就像我们的业务用户那样),监控生产者会给每个集群发送少量消息。然后检查发送是否成功:
发送成功 |
成功 |
刷盘超时 |
|
Slave超时 |
|
Slave不可用 |
|
发送失败 |
具体错误码 |
生产者只对这些结果进行打点,不判断是否正常,具体到监控(或者演练)场景可以配置不同的报警规则。
消费者收到了消息会通过TCP旁路Ack生产者,生产者这边会做分布式对账,将对账结果打点:
- 收到消息
- 消息丢失(或超时未收到消息)
- 重复收到消息
- 消息生成到最终消费的时间差
- Ack生产者失败(由消费者打点)
同样监控程序只负责打点,报警规则可另外配置。
这套机制也可以用于分布式性能压测和故障演练。在做压测的时候,每个消息都Ack的话,对生产者的内存压力很大,因为它发出去的消息,需要在内存中保留一段时间(直到到达这个消息的对账时间),这段时间消费者Ack或者重复Ack都需要记录。所以我们实现了按比例抽样对账的功能,开启以后只有需要对账的消息才会在内存中保留一段时间。
顺便说一下,我们做压测时,合格的标准是异步生产不失败、消费不延迟、每一个消息都不丢失。这样做是为了保证压测时能给出更加准确的,可供线上系统参考的性能数字,而不是制造理想条件,追求一个大的数字。比如异步生产比同步生产更脆弱(压测client如果同步生产,broker抖动的时候,同步client会被堵塞导致发送速度降低,于是降低了broker压力,消息发送不容易失败,但是会看到发送速率在波动),更贴近生产环境的实际情况,我们就选择异步生产来评估。
性能优化
Broker默认的参数在我们的场景下(SSD、同步复制、异步刷盘)不是最优的,有的参数也许在大多数场景下都不是最优的。我们列出一些重要的参数,供大家参考:
参数 |
默认值 |
说明 |
flushCommitLogTimed |
False |
默认值不合理,异步刷盘这个参数应该设置成true,导致频繁刷盘,对性能影响极大 |
deleteWhen |
04 |
几点删除过期文件的时间,删除文件时有很多磁盘读,这个默认值是合理的,有条件的话还是建议低峰删除 |
sendMessageThreadPoolNums |
1 |
处理生产消息的线程数,这个线程干的事情很多,建议设置为2~4,但太多也没有什么用。因为最终写commit log的时候只有一个线程能拿到锁。 |
useReentrantLockWhenPutMessage |
False |
如果前一个参数设置比较大,这个最好设置为True,避免高负载下自旋锁空转消耗CPU。 |
sendThreadPoolQueueCapacity |
10000 |
处理生产消息的队列大小,默认值可能有点小,比如5万TPS(异步发送)的情况下,卡200ms就会爆。设置比较小的数字可能是担心有大量大消息撑爆内存(比如100K的话,1万个的消息大概占用1G内存,也还好),具体可以自己算,如果都是小消息,可以把这个数字改大。可以修改broker参数限制client发送大消息。 |
brokerFastFailureEnable |
True |
Broker端快速失败(限流),和下面两个参数配合。这个机制可能有争议,client设置了超时时间,如果client还愿意等,并且sendThreadPoolQueue还没有满,不应该失败,sendThreadPoolQueue满了自然会拒绝新的请求。但如果client设置的超时时间很短,没有这个机制可能导致消息重复。可以自行决定是否开启。理想情况下,能根据client设置的超时时间来清理队列是最好的。 |
waitTimeMillsInSendQueue |
200 |
200ms很容易导致发送失败,建议改大,比如1000 |
osPageCacheBusyTimeOutMills |
1000 |
Page cache超时时间,如果内存比较多,比如32G以上,建议改大点 |
得益于简单、几乎0依赖的部署模式,使得我们部署小集群的成本非常低;不对社区版本进行魔改,保证我们可以及时升级;统一SDK入口方便集群维护和功能升级;通过复合小集群+自动负载均衡实现多机房多活;充分利用RocketMQ的功能比如事务消息、延迟消息(增强)来满足业务的多样性需求;通过自动的分布式对账,对每一个Broker以及我们的SDK进行正确性监控;本问也进行了一些性能参数的分享,但写的比较简单,基本只说了怎么调,但没能细说为什么,以后我们会另写文章详述。目前RocketMQ已经应用在公司在大多数业务线,期待将来会有更好的发展!
本文为阿里云原创内容,未经允许不得转载。