引子
服务解耦通常采用消息系统。本文总结消息系统相关的思想、技术及知识。
需求与目标
需求是:将数据从某个生产源投递到多个消费源。
设计目标是实现一个消息系统,完成“生产者-消费者”模型中的消息投递能力。消息投递应当满足如下要求:
- 性能:低延迟、高吞吐量;
- 高可用:副本机制与一致性实现;
- 消息可靠性:保证消息不丢失;可用性与一致性的选项;
- 特性支持选项:多生产者/多消费者、分批处理(批量投递消息)、顺序保证(按消息生产顺序投递)、保留期限(保留天数或消息量大小)、实时或非实时读取;广播能力;事务能力。
实现思路
- 设计要点:RPC协议;存储;
- 优化措施:零拷贝机制;分区多副本;
- 设计考量:及时性(消费延迟、准实时或批量处理)、可靠性保证、高吞吐量和动态吞吐量、数据格式、数据转换(ETL,ELT)、安全性(认证机制和加密传输,审计和追踪)、故障处理、耦合性和灵活性(临时数据管道、缺失元数据、末端处理)、Collection API 的开箱即用。
可靠性保证
可靠性保证:“至多一次”,“至少一次”,“仅有一次”。
消息的可靠性保证,不仅仅取决于消息系统,还取决于与消息系统集成的上下游一整套的可靠性措施。
网络层面
可靠传输协议 TCP。
设计者层面
可靠生产,可靠存储,可靠消费。
- 可靠生产 -- 消息生产确认;
- 可靠存储 -- 复制、持久化,多副本,落盘后发送;
- 可靠消费 -- 最少一次,确认机制、重试、幂等处理。
使用者层面
- 生产者和消费者的消息连接配置,消息连接组件可靠性(适配业务端的生产和消费速率);
- 消息体尽可能精简,保持在小体积(避免大体积消息在高并发下的发送 RT 上涨);
- 避免消息里有大 JSON,防止 JSON 过长截断导致脏数据( 避免消息队列堵塞);
- 消息消费后要 ACK(避免消息堆积);
- 数据格式的一致性(避免消息生产者和消费者之间的升级耦合);
- 异常重试消息发送或消费(避免数据丢失);
- 幂等处理消息(避免资损问题);
- 生产者写入消息的速度控制;
- 消费者的消费速率保证。
消息实现
KafKa
- 基本概念:Topic, Partition ; Leader/Follower ;Broker ; Producer, Consumer ;
主题与分区
- 某一种类型的所有事件被归类在一个主题 Topic 下;在 Kafka 里,事件与消息或记录的含义类同;
- 一个主题 Topic 下的事件可以写入多个分区 Partition,每个分区有一个首领副本和多个跟随者副本;分区及副本机制实现 Kafka 的高可用、高可靠和高伸缩性;
- 生产者 Producer 可以为某种消息指定消息键,通过 Kafka 分区器为消息键生成散列值,从而写入指定分区,这样,同样键的消息可以写入同一个分区。
生产者
- 发送对象:ProducerRecord => [ Topic, Partition, Key, Value ] ;
- 发送过程:生产者先将 ProducerRecord 发给序列化器,序列化为字节数组,然后发给分区器。如果没有指定 Partition ,则分区器会根据键 key 计算得到一个分区值并返回;最后发给 Broker,如果成功则返回元数据 RecordMetaData,否则重试若干次(全部重试失败则报错);
- 发送方式: 发送后即可;同步发送,获取返回值; 异步发送,回调函数;
- 发送配置: [ bootstrap.servers, key.serializer, value.serializer ] ;bootstrap.servers 的配置为 broker1 host1 port, broker2 host2 port ,key.serializer, value.serializer 是 Key 与 Value 的序列化类,默认有 ByteArraySerializer, StringSerializer, IntegerSerializer ;
- acks: 消息丢失和吞吐量的权衡。0 -- 生产者写入消息后不等待服务端的响应,可达到高吞吐量,但是很可能会丢消息而无感知,适合能忍受少许消息丢失的场景; 1 -- 集群首领节点收到消息后,生产者会得到服务器响应,如果没有首领节点,会有消息丢失,吞吐量取决于同步还是异步,适中的策略; all -- 所有参与复制的节点都收到消息后,生产者才会得到服务器响应,消息几乎不会丢失,但是延迟可能比较大,吞吐量比较低;适合于对延迟和吞吐量不敏感但是无法忍受消息丢失的场景;
- buffer.memory:消息会先发送到缓冲区,然后用单独的线程发送给服务端。设置生产者消息缓冲区大小,若发送数据量超过此大小,则发送消息动作会被阻塞;
- compress.type: snappy -- 占用 CPU 较少,压缩比高; gzip -- 占用更多 CPU,更高压缩比。
消费者
-
消费者群组:一个含有多个消费者的消费者群组,订阅同一个主题的所有分区;该主题的所有分区的每一条消息仅能被该消费者群组中的某一个消费者所消费;
-
同一个主题下的所有分区的每一条消息会广播到每一个消费者群组;订阅同一个主题的不同消费群组的消费是互不影响的;
-
消费者群组代表着某个主题的消费应用场景,不同的应用场景建立不同的消费者群组即可;
-
群组保证每个分区只能被该群组里的一个消费者使用;当一个群组里的消费者数量超过该主题下的分区数时,将会有消费者闲置;
-
同一个消费者可以通过正则表达式订阅多个主题;
-
消费者的配置:[ bootstrap.servers, key.serializer, value.serializer, group.id ]
-
消息偏移量:写入分区时,消息会生成一个递增的偏移量。 消费者 Consumer 可以根据偏移量按消息生成的顺序进行消费;
-
消费者通过轮询的方式处理从分区获取的数据。
-
Kafka 不需要消费者进行确认。消费者可以跟踪消费的偏移量(通过往主题为 _consumer_offset 的特殊主题发送消息);
-
当消费者崩溃或有新消费者加入群组时(类似服务器扩容或缩容),会导致分区的再均衡;
-
消费者需要读取最后一次提交的偏移量并从这里开始重新工作。这样可能会有重复消息或丢失消息的概率。消费者需要自行处理偏移量。
Broker
一个独立的 Kafka 服务器称为 Broker 。Broker 处理生产者的消息,并提供给消费者服务。同时也承担集群控制器的角色。Kafka 通过 ZK 来管理 Broker 的关联。Broker 会注册到 ZK 的一个临时节点 /kafka/brokers/ids/ 上,当 Broker 崩溃、退出集群、长时间垃圾回收导致停顿时,Broker 会从 ZK 上断开连接,Kafka 就会得到通知,临时节点会被移除。控制器 Broker 会注册到临时节点 /controller 上,其他 Broker 会创建该临时节点的 Watch 监听对象。
可靠性保证
Kafka 提供一些基本的保证机制:
- 同一个生产者往同一个分区写消息,先写入的消息可以被先读取;
- 只有当消息被写入所有副本时,才会被认为是“已提交的”;消费者只能读取已提交的消息;
- 生产者可以决定写入多少个分区副本才确认;
- Kafka 可以通过参数配置来获得灵活的可靠性保证。
Kafka 的复制机制和分区多副本设计是其可靠性的核心保证。要保证 Kafka 的可靠性,需要从 Kafka 的配置、生产者的消息发送与确认、消费者的消息处理等多个层面多管齐下。
Kafka 的生产配置:
- Kafka 只允许首领副本读取生产者数据,跟随者副本只负责保持与首领副本一致;
- 复制系数:每个分区可以有 N 个副本,默认 N = 3。副本越多,可用性越强,需要的存储空间和 Broker 也越多;可用性与硬件存储的权衡;主题的配置参数是 replication.factor, Broker 的配置参数是 default.replication.factor ;
- 副本分布:Kafka 保证分区副本分布在不同的 Broker 上。进一步地,可以通过 broker.rack 参数,为每个 Broker 设置所在机架,保证 Broker 分布在不同机架上,避免 Broker 分布在同一机架上导致的不可用;
- 首领副本不可用: 当首领副本不可用时,某个跟随者副本会被提升为首领副本。若首领崩溃,而跟随者副本滞后,则会有两个选择 -- 若不允许不同步的跟随者副本提升为首领副本,会导致不可用;若允许不同步的跟随者副本提升为首领副本,则会导致数据不一致;具体要根据业务场景来选用;可用性与一致性的权衡;可以通过 unclean.leader.election.enable 来设置是否允许不同步的跟随者副本提升为首领副本;
- 最少同步副本:最少要写入 N 个同步副本后,才能向磁盘写入数据,消息变成“可提交”的;性能与可用性的权衡;配置参数为 min.insync.replicas 。
生产者发送确认的副本配置:
- acks=0:如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka ;有丢数据的风险,吞吐量最高;
- acks=1 首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应;丢数据和吞吐量的折中;
- acks = all 首领在返回确认或错误响应之前,会等待所有同步副本都收到消息;将数据丢失风险降低到最低,吞吐量最低。
生产者和消费者的其它注意事项:
- 生产者发送消息的可重试错误:重试机制;重试风险是消息重复,消费者需要做好幂等处理;
- 生产者发送消息的不可重试错误处理:Broker 错误、消息本身的错误、序列化错误等;
- 消费者的处理:消费者检测并读取消息偏移量;消费幂等处理。
示例
参考资料
- 《Kafka权威指南》 : 第 1-6, 11 章
- 订单同步的生产实践经验