在 0.10 版本之前, Kafka仅仅作为一个消息系统,主要用来解决应用解耦、 异步消息 、 流量削峰等问题。 在0.10版本之后, Kafka提供了连接器与流处理的能力,
它也从分布式的消息系统逐渐成为一个流式的数据平台 。
Kafka 流式数据平台
作为一个流式数据平台,最重要的是要具备下面3个特点 。
- 类似消息系统,提供事件流的发布和订阅,即具备数据注入功能 ;
- 存储事件流数据的节点具有故障容错的特点,即具备数据存储功能 ;
- 能够对实时的事件流进行流式地处理和分析,即具备流处理功能 。
Kafka是如何实现并组合上面的3个功能特点的 :
消息系统:
消息系统 ( 也叫作消息队列)主要有两种消息模型:队列和发布订阅 。 Kafka使用消费组( consumer group )统一了上面两种消息模型 。 Kafka使用队列模型时,
它可以将处理工作平均分配给消费组中的消费者成员;使用发布订阅模式时,它可以将消息广播给多个消费组 。 采用多个消费组结合多个消费者,既可以线性扩展消息的处理能力,也
允许消息被多个消费组订阅 。
存储系统:
任何消息队列要做到“发布消息”和“消费消息”的解耦合, 实际上都要扮演一个存储系统的角色,负责保存还没有被消费的消息 。 否则,如果消息只是在内存中, 一旦机
器宕机或进程重启,内存中的消息就会全部丢失 。 Kafka也不例外,数据写入Kafka集群的服务器节点时,还会复制多份来保证出现故障时仍能可用 。 为了保证消息的可靠存储, Kafka
还允许生产者的生产请求在收到应答结果之前,阻塞式地等待一条消息,直到它完全地复制到多个节点上,才认为这条消息写入成功 。
流处理系统:
流式数据平台仅仅有消息的读取和写入、存储消息流是不够的,还需要有实时的流式数据处理能力 。 对于简单的处理,可以直接使用Kafka的生产者和消费者API来完成;
但对于复杂的业务逻辑处理,直接操作原始的API需要做的工作非常多。 Kafka流处理 ( KafkaStreams )为开发者提供了完整的流处理API ,比如流的聚合、连接 、 各种转换操作 。
Kafka流处理框架内部解决很多流处理应用程序都会面临的问题 :处理乱序或迟来的数据、重新处理输入数据 、 窗口和状态操作等 。
将消息系统、存储存储、流处理系统组合在一起:
传统消息系统的流处理通常只会处理订阅动作发生之后才到达的新消息,无法处理订阅之前的历史数据 。 分布式文件存储系统一般存储静态的历史数据,
对历史数据的处理一般采用批处理的方式 。 现有的开源系统很难将这些系统无缝地整合起来, Kafka则将消息系统、存储系统 、 流处理系统都组合在一起,
构成了以Kafka为中心的流式数据-处理平台 。 它既能处理最新的实时数据,也能处理过去的历史数据 。
Kafka作为流式数据平台的核心组件,主要包括下面4种核心的 API ,如图所示 。
- 生产者 ( producer )应用程序发布事件流到Kafka的一个或多个主题。
- 消费者 ( consumer )应用程序订阅Kafka的一个或多个主题,并处理事件流。
- 连接器 ( connector )将Kafka主题和已有数据源进行连接,数据可以互相导入和导出 。
- 流处理 ( processor ) Kafka主题消费输入流,经过处理后,产生输出流到输出主题。
Kafka 的基本概念
分区模型
Kafka集群由多个消息代理服务器( broker server )组成,发布到Kafka集群的每条消息都有一个类别,用主题( topic )来表示 。 不同应用产生不同类型的数据,
可以设置不同的主题。 一个主题会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息 。
Kafka集群为每个主题维护了分布式的分区( partition )日志文件,物理意义上可以把主题看作分区的日志文件( partitioned Jog )。
每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到提交日志( commit log )。 分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,
叫作偏移盘( offset ),这个偏移量能够唯一地定位当前分区中的每一条消息 。
如图 (左)所示,主题有3个分区,每个分区的偏移量都从0开始,不同分区之间的偏移量都是独立的,不会互相影响 。 右图中,
发布到 Kafka主题的每条消息包括键值和时间戳。 消息到达服务端的指定分区后,都会分配到一个自增的偏移量。 原始的消息内容和分配的偏移量以及其他一些元数据
信息最后都会存储到分区日志文件中 。 消息的键也可以不用设置,这种情况下消息会均衡地分布到不同的分区 。
传统消息系统在服务端保持消息的顺序 , 如果有多个消费者消费同一个消息队列,服务端会以消息存储的顺序依次发送给消费者 。 但由于消息是异步发送给消费者的,
消息到达消费者的顺序可能是无序的,这就意味着在并行消费时,传统消息系统无法很好地保证消息被顺序处理。 虽然我们可以设置一个专用的消费者只消费一个队列,
使得消费处理无法真正以此来解决消息顺序的问题,但是这就执行。
Kafka比传统消息系统有更强的顺序性保证 , 它使用主题分区作为消息处理的并行单元 。 Kafka以分区作为最小的粒度,将每个分区分配给消费组中不同的而且是
唯一的消费者,并确保一个分区只属于一个消费者, 即这个消费者就是这个分区的唯一读取线程。 那么,只要分区的消息是有序的,消费者处理的消息顺序就有保证。
每个主题有多个分区,不同的消费者处理不同的分区,所以Kafka不仅保证了消息的有序性,也做到了消费者的负载均衡。
消费模型
消息由生产者发布到Kafka集群后,会被消费者消费 。 消息的消费模型有两种:推送模型( push )和拉取模型( pull )。
推送模型,由消息代理记录消费者的消费状态 。 消息代理在将消息推送到消费者后 , 标记这条消息为已消费,但这种方式无法很好地保证消息的处理语义 。
比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,
但实际上这条消息并没有被实际处理)。 如果要保证消息的处理语义,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,
这就需要在消息代理中记录所有消息的消费状态,这种做法也是不可取的 。
Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息 。 如图所示,有两个消费者(不同消费组)拉取同一个主题的消息,
消费者A的消费进度是3,消费者B的消费者进度是6。 消费者拉取的最大上限通过最高水位( watermark )控制,生产者最新写入的消息如果还没有达到备份数量,
对消费者是不可见的 。 这种由消费者控制偏移量的优点是 : 消费者可以按照任意的顺序消费消息 。 比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息 ;
或者直接跳到最近的位置,从当前时刻开始消费 。
在一些消息系统中,消息代理会在消息被消费之后立即删除消息 。 Kafka的做法是生产者发布的所有消息会一直保存在Kafka集群中,不管消息有没有被消费 。 用户
可以通过设置保留时间来清理过期的数据,比如 , 设置保留策略为两天 。 在消息发布之后,它可以被不同的消费者消费,在两天之后,过期的消息就会向动清理掉。
分布式模型
Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上 。 其中一个节点会作为主副本( Leader ),
其他节点作为备份副本( Follower ,也叫作从副本)。 主副本会负责所有的客户端读写操作,备份副本仅从主副本同步数据 。 当主副本出现故障时,
备份副本中的一个副本会被选择为新的主副本 。 因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本 ,
这样Kafka集群的所有服务端整体上对客户端是负载均衡的 。
Kafka的生产者和消费者相对于服务端而言都是客户端,生产者客户端发布消息到服务端的指定主题 , 会指定消息所属的分区 。 生产者发布消息时根据消息是否有键 ,
采用不同的分区策略。 消息没有键时,通过轮询方式进行客户端负载均衡。消息有键时,根据分区语义确保相同键的消息总是发送到同一个分区 。
Kafka的消费者通过订阅主题来消费消息 并且每个消费者都会设置一个消费组名称 。 因为生产者发布到主题的每一条消息都只会发送给消费组的一个消费者。
如果要实现传统消息系统的“队列”模型,可以让每个消费者都拥有相同的消费组名称,这样消息就会负载均衡到所有的消费者;如果要实现“发布-订阅”模型,
则每个消费者的消费组名称都不相同,这样每条消息就会广播给所有的消费者。
同一个消费组下多个消费者互相协调消费工作, Kafka会将所有的分区平均地分配给所有的消费者实例,这样每个消费者都可以分配到数量均等的分区 。
Kafka的消费组管理协议会动态地维护消费组的成员列表,当一个新消费者加入消费组,或者有消费者离开消费组,都会触发再平衡操作。
Kafka的消费者消费消息时,只保证在一个分区内消息的完全有序性,并不保证同一个主题中 多个分区的消息顺序 。 而且,消费者读取一个分区消息的
顺序和生产者写入到这个分区的顺序是一致的 。比如,生产者写入“hello”和“kafka”两条消息到分区Pl ,则消费者读取到的顺序也一定是 “he lo”和“kafka” 。
如果业务上需要保证所有消息完全一致,只能通过设置一个分区完成,但这种做法的缺点是最多只能有一个消费者进行消费。 一般来说,只需要保证每个分区的有序性,
再对消息加上键来保证相同键的所有消息落入同一个分区,就可以满足绝大多数的应用 。
Kafka 的设计与实现
文件系统的持久化与数据传输效率
如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。
如图 (左)所示,应用程序写入数据到文件系统的一般做法是:在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。 但这里我们要做完全相反的事情,
如图中所有的数据都立即写入文件系统的持久化日志文件,但不进行刷新数据的任何调用 。 数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。
消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。 生产者如果每发送一条消息都直接通过
网络发送到服务端,势必会造成过多的网络请求 。 如果能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,
就可以减少网络传输的带宽,进一步提高数据的传输效率。
消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点 。 如图 (左)所示,传统读取磁盘
文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下 :
(1) 操作系统将数据从磁盘中读取文件到内核空间里的页面缓存 。
(2)应用程序将数据从内核空间读入用户空间的缓冲区 。
(3)应用程序将读到的数据写回内核空间并放入 socket缓冲区 。
(4)操作系统将数据从 socket缓冲区复制到网卡接口,此时数据才能通过网络发送归去 。
结合Kafka的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如图 (右)所示,使用“零拷贝技术”( zero-copy )
只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),
避免了重复的复制操作 。 这样,消息使用的速度基本上等同于网络连接的速度了 。
生产者与消费者
Kafka的生产者将消息直接发送给分区主副本所在的消息代理节点,并不需要经过任何的中间路由层 。 为了做到这一点,所有消息代理节点都会保存一份相同的元数据,
这份元数据记录了每个主题分区对应的主副本节点 。 生产者客户端在发送消息之前,会向任意一个代理节点请求元数据,并确定每条消息对应的目标节点 然后把消息直接发
送给对应的目标节点 。
如图 所示,生产者客户端有两种方式决定发布的消息归属于哪个分区:通过随机方式将请求负载到不同的消息、代理节点(左图),或者使用“分区语义函数”将相同键
的所有消息发布到同一个分区(如图)。 对于分区语义, Kafka暴露了一个接口,允许用户指定消息的键如何参与分区 。比如,我们可以将用户编号作为消息的键,
因为对相同用户编号散列后的值是固定的,所以对应的分区也是固定的 。
生产者采用批量发送消息集的方式解决了网络请求过多的问题。 生产者会尝试在内存中收集足够数据,并在一个请求中一次性发送一批数据。
另外,我们还可以为生产者客户端设置“在指定的时间内收集不超过指定数量的消息” 。
如图 所示,消费者读取消息、有两种方式。 第一种是消息代理主动地“推送”消息给下游的消费者(左图),由消息代理控制数据传输的速率,
但是消息代理对下游消费者是否能及时处理不得而知 。
第二种读取方式是消费者从消息代理主动地“拉取”数据(右图),消息代理是无状态的,它不需要标记哪些消息被消费者处理过,
也不需要保证一条消息只会被一个消费者处理。 而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,
它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据 。
因为消息系统不能作为严格意义上的数据库,所以保存在消息系统中的数据,在不用之后应该及时地删除并释放磁盘空间 。 消息需要删除,
其原因一般是消息被消费之后不会再使用了,大多数消息系统会在消息代理记录关于消息是否已经被消费过的状态 :当消息从消息代理发送给消费者时(基于推送模型),
消息代理会在本地记录这条消息“已经被消费过了” 。 但如果消费者没能处理这条消息(比如由于网络原因、请求超时或消费者挂掉),就会导致“消息丢失” 。
解决消息丢失的一种办法是添加应答机制,消息代理在发送完消息后只把消息标记为“已发送”,只有收到消费者返回的应答信息才表示“己消费” 。
但还是会存在一个问题:消费者处理完消息就失败了,导致应答没有返回给消息代理,这样消息代理又会重新发送消息,导致消息被重复处理。 这种方案还有一个缺点:消息代理
需要保存每条消息的多种状态(比如,消息状态为“已发送”时,消息代理需要锁住这条消息,保证- 消息不会发送两次) , 这种方式需要在客户端和服务端做一些复杂的状态一致性保证 。
Kafka采用了 基于拉取模型 的消费状态处理,它将主题分成多个有序的分区,任何时刻每个分区都只被一个消费者使用 。 并且,消费者会记录每个分区的消费进度( 即偏移量)。
每个消费者只需要为每个分区记录一个整数值 ,而不需要像其他消息系统那样记录每条消息的状态 。 假设有 10000条消息,传统方式需要记录 10000条消息的状态 ;
如果用Kafka的分区机制,假设有 10个分区,每个分区 1000条消息,总共只需要记录 10个分区的消费状态(需要保存的状态数据少了很多,而且也没有了锁)。
和传统方式需要跟踪每条消息的应答不同, Kafka的消费者会定时地将分区的消费进度保存成检查点文件,表示“这个位置之前的消息都已经被消费过了”。
传统方式需要消费者发送每条消息的应答,服务端再对应答做出不同的处理 ;而Kafka只需要让消费者记录消费进度,服务端不需要记录消息的任何状态 。
除此之外,让消费者记录分区的消费进度还有一个好处:消费者可以“故意”回退到某个旧的偏移量位置,然后重新处理数据 。
虽然这种处理方式看起来违反了队列模型的规定( 一条消息发送给队列的一个消费者之后,就不会被其他消费者再次处理),但在实际运用中,
很多消费者都 需要这种功能 。 比如,消费者的处理逻辑代码出现了问题,在部署并启动消费者后,需要处理之前的消息并重新计算 。
和生产者采用批量发送消息类似,消费者拉取消息也可以一次拉取一批消息 。 消费者客户端拉取消息,然后处理这一批消息,这个过程一般套在一个死循环里,
表示消费者永远处于消费消息的状态(因为消息系统的消息总是一直产生数据,所以消费者也要一直消费消息)。 消费者采用拉取方式消费消息有一个缺点 :如果消息代理没有数据
或者数据量很少,消费者可能需要不断地轮询,并等待新数据的到来(拉取模式主动权在消费者手里,但是消费者并不知道消息代理有没有新的数据;如果是推送模式,只有新数据产生时,
消息代理才会发送数据给消费者,就不存在这种问题)。 解决这个问题的方案是:允许消费者的拉取请求以阻塞式、长轮询的方式等待,直到有新的数据到来 。
我们可以为消费者客户端设置“指定的字节数量”,表示消息代理在还没有收集足够的数据时,客户端的拉取请求就不会立即返回 。
副本机制和容错处理
Kafka的副本机制会在多个服务端节点(简称节点 即消息代理节点 )上对每个主题分区的日志进行复制 。 当集群中的某个节点出现故障时,
访问故障节点的请求会被转移到其他正常节点的副本上。副本的单位是主题的分区, Kafka每个主题的每个分区都有一个主副本以及0个或多个备份副本。
备份副本会保持和主副本的数据同步,用来在主副本失效时替换为主副本。
如图 所示,所有的读写请求总是路由到分区的主副本 。 虽然生产者可以通过负载均衡策略将消息分配到不同的分区,但如果这些分区的主副本
都在同一个服务器上(左图),就会存在数据热点问题。 因此,分区的主副本应该均匀地分配到各个服务器上(右图)。 通常,分区的数量要比服务器多很多,
所以每个服务器都可以成为一些分区的主副本,也能同时成为一些分区的备份副本 。
备份副本始终尽量保持与主副本的数据同步。 备份副本的日志文件和主副本的日志总是相同的,它们都有相同的偏移量和相同顺序的消息 。
备份副本从主副本消费消息的方式和普通的消费者一样,只不过备份副本会将消息运用到自己的本地日志文件(备份副本和主副本都在服务端,
它们都会将收到的分区数据持久化成日志文件)。 普通的消费者客户端拉取到消息后并不会持久化,而是直接处理。
分布式系统处理故障容错时,需要明确地定义节点是否处于存活状态 。 Kafka对节点的存活定义有两个条件:
节点必须和ZK保持会话;
如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多 。
满足这两个条件,叫作“正在同步中”( in-sync )。 每个分区的主副本会跟踪正在同步中的备份副本节点( In Sync Replicas ,即ISR )。 如果一个备份副本挂掉、没有响应或者落后太多,
主副本就会将其从同步副本集合中移除。 反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中 。
在Kafka中, 一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了 。 任何时刻,只要ISR至少有一个副本是存活的,
Kafka就可以保证“一条消息一旦被提交,就不会丢失” 。 只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息 。