zoukankan      html  css  js  c++  java
  • kafka概念

    来源:《Apache Kafka 实战》胡夕

    kafka第三方库下载地址

    https://cwiki.apache.org/confluence/display/KAFKA/Clients

    kafka: 消息引擎 + 流式处理平台kafka streams = 流式处理框架
    kafka核心架构
    -生产者发送消息给kafka服务器。
    -消费者从kafka服务器读取消息。
    -kafka服务器依托ZooKeeper集群进行服务的协调管理

    最常见的两种消息引擎范型:消息队列模型和发布/订阅模型,kafka同时支持这两种消息引擎模型。

    borker-kafka服务器
    producer-生产者
    consumer-消费者
    partition-分区
    replica-副本
    topic-主题
    publisher-发布者
    subscriber-订阅者

    ISR-同步副本集合in-sync replicas

    配置参数-基于0.11.0.0版本

    broker端

    参数 含义
    unclean.leader.election.enable=false  不允许非ISR中的副本被选举为leader
    replication.factor=3 强调一定要使用多个副本来保存分区的消息
    min.insync.replicas=2

     控制某条消息至少被写入到ISR中的多少个副本才算成功,

    只有在producer端的acks设置成all或-1时,这个参数才有意义

    确保replication.factor>min.insync.replicas   

    replica.lag.time.max.ms

    默认是10秒。若follower副本无法在10s内向leader请求数据,

    那么该follower就会被视为“不同步”,即会被踢出ISR。

    log.segment.bytes

    默认大小是1GB。控制日志段文件的大小。

    log.index.interval.bytes

    默认大小是4KB。即kafka分区至少写入了4KB数据后才会在

    索引文件中增加一个索引项。

    log.retention.{hours|minutes|ms}

    这是三个参数,表示清除日志的时间间隔,其中ms优先级最高,minutes次之,hours优先级最低。

    log.retention.bytes 默认值-1,表示kafka不会对log进行大小方面的限制。这个参数表示kafka只会为每个log保存的字节数。

     producer端

    参数  含义
    max.block.ms=3000 控制block的时长,当buffer空间不够或者metadata丢失时产生block
    acks=all or -1 所有follower都响应了发送消息才能认为提交成功
    retries=Integer.MAX_VALUE producer开启无限重试,只会重试那些可恢复的异常情况
    max.in.flight.requests.per.connection=1 限制了producer在单个broker连接上能够发送的未响应请求的数量,为了防止topic同分区下的消息乱序问题

    consumer端

    参数 含义
    enable.auto.commit=false 

    设置不能自动提交位移,需要用户手动提交位移

    消息
    消息:由消息头部、key和value组成。
    消息头部:包括CRC32校验码、版本号、属性(一位-压缩类型)、时间戳、键长度和消息体长度等信息。
    -key:消息键,对消息做partition时使用,即决定消息被保存在某topic下的哪个partition。
    -value:消息体,保存实际的消息数据。
    -timestamp:消息发送时间戳,用于流式处理及其他依赖时间的处理语义。如果不指定则取当前时间。

    kafka的消息是用二进制方式-字节数组ByteBuffer保存,且是结构化的消息。
    kafka自己设计了一套二进制的消息传输协议。

    topic主题、partition分区
    topic只是一个逻辑概念,代表了一类消息,也可以认为是消息发送到的地方。
    kafka的topic通常被多个消费者订阅。
    kafka采用topic-partition-message三级结构来分散负载。
    kafka的topic有若干个partition组成。
    kafka的partition是不可修改的有序消息序列或者说是有序的消息日志,每个partition有自己的partition号,从0开始。
    用户对partition唯一能做的就是在消息序列的尾部追加写入消息。
    partition上的每条消息都会被分配一个唯一的序列号-称为位移offset,从0开始顺序递增的整数。
    kafka的一条消息其实就是一个<topic,partition,offset>三元组,通过该三元组可以在kafka集群中找到唯一对应的那条消息。

    副本replica 

    副本分为两类:领导者副本leader replica和追随者副本follower replica。

    follow replica是不能提供服务非客户端的,也就是说不负责响应客户端发来的消息写入和消息消费请求。只是被动地向领导者副本获取数据,而一旦leader replica所在的broker宕机了,kafka会从剩余的replica中选举出新的leader replica继续提供服务。

    kafka保证同一个partition的多个replica一定不会分配在同一台broker上。

    ISR

    全称in-sync replica,意思是与leader replica保持同步的replica集合。

    kafka为partition动态维护一个replica集合。该集合中的所有replica保持的消息日志都与leader replica保持同步状态。只有这个集合中的replica才能被选举为leader replica,也只有该集合中所有replica都接收到了同一条消息,kafka才会将该消息置于“已提交”状态,即认为这条消息发送成功。

    kafka的设计初衷就是为了解决互联网公司超大量级数据的实时传输。
    设计之初就需要考虑以下4个问题:
    吞吐量/延时、消息持久化、负载均衡和故障转移、伸缩性。

    吞吐量/延时
    吞吐量:某种处理能力的最大值。
    kafka的吞吐量,就是每秒能够处理的消息数或者每秒能够处理的字节数。吞吐量越大越好。

    延时:指一段时间间隔。
    kafka的延时,表示客户端发起请求与服务器处理请求并发送响应给客户端之间的这一段时间。延时越短越好。

    实现高吞吐量、低延时方法
    -大量使用操作系统页缓存,内存操作速度快且命中率高。
    -kafka不直接参与物理I/O操作,而是交由最擅长此事的操作系统来完成。
    -采用追加写入方式,摒弃了缓慢的磁盘随机读/写操作。
    -使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率。

    每次写入操作都只是把数据写入到操作系统的页缓存中,然后由操作系统自行决定什么时候把页缓存中的数据写到磁盘上。kafka会持久化所有数据到磁盘上。
    kafka在设计时采用了追加写入消息的方式,即只能在日志文件末尾追加写入新的消息,且不允许修改已写入的消息,属于典型的磁盘顺序访问性操作,速度相当于内存随机访问。
    kafka在读取消息时会首先尝试从操作系统的页缓存中读取,如果命中便把消息经页缓存直接发送到网络的socket上。这个过程就是利用Linux平台的sendfile系统调用实现的,实现技术就是零拷贝。
    kafka大量使用页缓存,故读取消息时大部分消息很有可能依然保存在页缓存中,因此可以直接命中缓存。


    消息持久化
    kafka持久化消息,就是把消息持久化到磁盘上。
    kafka是把所有消息立即写入文件系统的持久化日志中,之后kafka服务器才会返回结果给客户端通知它们消息已被成功写入。这样做既实时保存了数据,又减少了kafka程序对内存的消耗,从而将节省出的内存留给页缓存使用,进一步提升了整体性能。
    好处:解耦消息发送与消息消费;实现灵活的消息处理,对于已经处理过的消息可以在未来的某个时间点被重新处理一次。


    负载均衡和故障转移
    负载均衡:让系统的负载根据一定的规则均衡地分配在所有参与工作的服务器上,从而最大限度地提升系统整体的运行效率。
    kafka通过智能化的分区领导者选举来实现负载均衡,可以在集群的所有机器上以均等机会分散各个partition的leader。

    故障转移:当服务器意外中止时,整个集群可以快速地检测到该失效failure,并立即将该服务器上的应用或服务自动转移到其他服务器上。
    故障转移通常是以“心跳”或“会话”的机制来实现的,即主服务器与备份服务器之间的心跳无法维持或主服务器注册到服务中心的会话超时过期,那么就认为主服务器已无法正常运行,集群会自动启动某个备份服务器来替代主服务器的工作。
    kafka服务器支持故障转移的方式就是使用会话机制。
    每台kafka服务器启动后会以会话的形式把自己注册到Zookeeper服务器上。一但该服务器运转出现问题,与ZooKeeper的会话便不能维持从而超时失效,此时kafka集群会选举出另一台服务器来完全替代这台服务器继续提供服务。

    伸缩性scalability
    伸缩性:表示向分布式系统中增加额外的计算资源(比如CPU、内存、存储或带宽)时吞吐量提升的能力。
    kafka实现线性扩容的方法:每台kafka服务器上的状态统一交由zookeeper保管,解决了阻碍线性扩容的因素-状态的保存。

    kafka使用场景

    消息传输、网址行为日志追踪、审计数据收集、日志收集、流式处理-kafka streams...

    clients

    kafka通常把生产者producer和消费者consumer统称为客户端clients,这是与服务端broker相对应的。

    producer生产者

    API包名:org.apache.kafka.clients.producer.*

    producer主要入口:org.apache.kafka.clients.producer.KafkaProducer。

    默认是异步发送消息,需要引入kafka-clients依赖。

    首要功能就是向某个topic的某个分区发送一条消息。

    1.确认目标分区

    分区器partitioner:确认要向topic的哪个分区写入消息。

    对于每条待发送的消息,如果该消息指定了key,那么该partitioner会根据key的哈希值来选择目标分区;

    若消息没有指定key,则partitioner使用轮询的方式确认目标分区。

    producer的API赋予了用户自行指定目标分区的权利,即用户可以在消息发送时跳过partitioner直接指定要发送到的分区。

    2.寻找分区对应的leader

    就是寻找该分区leader副本所在的kafka broker。

    producer工作原理

    producer首先使用一个线程(用户主线程,也就是用户启动producer的线程)将待发送的消息封装进一个ProduceRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定了目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另一个专属工作进程(I/O发送进程,也称sender进程)则负责实时地从该缓冲区中提取出准备就绪的消息封装进一个批次batch,统一发送给对应的broker。

    发送消息

    异步发送:实际上所有的写入操作默认都是异步的。

    同步发送:Java版本producer的send方法会返回一个Java Future对象供用户稍后获取发送结果,这就是所谓的回调机制。通过调用Future.get()无限等待结果返回,即实现同步发送的效果。

    消息序列化

    序列化器serializer:将消息转换成字节数组ByteArray。

    可自定义序列化:1.定义数据对象格式;2.创建自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口,在serializer方法中实现序列化逻辑;3.在用于构建KafkaProducer的Properties对象中设置key.serializer或value.serializer,取决于是为消息key还是value做自定义序列化。

    producer拦截器

    主要用于实现clients端的定制化控制逻辑。interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain。interceptor的实现接口是org.apahce.kafka.clients.producer.ProducerInterceptor,方法是onSend(ProducerRecord),onAcknowledgement(RecordMetadata,Exception),close。

    若指定多个interceptor,则producer将按照指定顺序调用它们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。

    解决消息丢失和消息乱序

    1.采用同步发送,但是性能会很差,并不推荐在实际场景中使用;

    2.做producer端的无消息丢失配置

    producer端配置

    max.block.ms=3000     控制block的时长,当buffer空间不够或者metadata丢失时产生block

    acks=all or -1   所有follower都响应了发送消息才能认为提交成功

    retries=Integer.MAX_VALUE  producer开启无限重试,只会重试那些可恢复的异常情况

    max.in.flight.requests.per.connection=1  限制了producer在单个broker连接上能够发送的未响应请求的数量,为了防止topic同分区下的消息乱序问题

    使用带回调机制的send发送消息,即KafkaProducer.send(record,callback)   会返回消息发送的结果信息

    Callback的失败处理逻辑中显式地立即关闭producer,使用close(0)  目的是为了处理消息的乱序问题,将不允许将未完成的消息发送出去

    broker端配置

    unclean.leader.election.enable=false    不允许非ISR中的副本被选举为leader

    replication.factor=3       强调一定要使用多个副本来保存分区的消息

    min.insync.replicas=2   控制某条消息至少被写入到ISR中的多少个副本才算成功,只有在producer端的acks设置成all或-1时,这个参数才有意义

    确保replication.factor>min.insync.replicas   

    consumer端配置

    enable.auto.commit=false   设置不能自动提交位移,需要用户手动提交位移

    消息压缩

    数据压缩显著地降低了磁盘占用或带宽占用,从而有效地提升了I/O密集型应用的性能。不过引用压缩同时会消耗额外的CPU时钟周期,因此压缩是I/O性能和CPU资源的平衡。

    kafka自0.7.x版本便开始支持压缩特性-producer端能够将一批消息压缩成一条消息发送,而broker端将这条压缩消息写入本地日志文件,consumer端获取到这条压缩消息时会自动对消息进行解压缩。即producer端压缩,broker保持,consumer解压缩。

    如果有些前置条件不满足,比如需要进行消息格式的转换等,那么broker端就需要对消息进行解压缩然后再重新压缩。

    kafka支持三种压缩算法:GZIP、Snappy和LZ4,性能LZ4>> Snappy>>GZIP,batch越大,压缩时间越长。

    如果发现压缩很慢,说明系统的瓶颈在用户主线程而不是I/O发送线程,因此可以考虑增加多个用户线程同时发送消息,这样通常能显著地提升producer吞吐量。

    多线程处理

    实际环境中只使用一个用户主线程通常无法满足所需的吞吐量目标,因此需要构造多个线程或多个进程来同时给kafka集群发送消息。

    -多线程单KafkaProducer实例:在全局构建一个KafkaProducer实例,然后在多个线程中共享使用。

    优点:实现简单,性能好

    缺点:1.所有线程共享一个内存缓冲区,可能需要较多内存;2.一旦producer某个线程崩溃导致KafkaProducer实例被破坏,则所有用户线程都无法工作

    -多线程多KafkaProducer实例:在每个producer主线程中都构造一个KafkaProducer实例,并且保证此实例在该线程中封闭(thread confinement,线程封闭是实现线程安全的重要手段之一)。

    优点:1.每个用户线程拥有专属的KafkaProducer实例、缓冲区空间及一组对应的配置参数,可以进行细粒度的调优;2.单个KafkaProducer崩溃不会影响其他producer线程工作

    缺点:需要较大的内存分配开销

    consumer消费者

    kafka消费者是从kafka读取数据的应用。若干个consumer订阅kafka集群中的若干个topic并从kafka接收属于这些topic的消息。

    API包名:org.apache.kafka.clients.consumer.*

    producer主要入口:org.apache.kafka.clients.consumer.KafkaConsumer。

    consumer分为两类:

    -独立消费者standalone consumer:单独执行消费操作

    -消费者组consumer group:由多个消费者实例构成一个整体进行消费

    消费者使用一个消费者组名group.id来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。

    kafka同时支持基于队列和基于发布/订阅的两种消息引擎模型:

    -实现基于队列的模型:所有consumer实例都属于相同的group,每条消息只会被一个consumer实例处理。

    -实现基于发布/订阅模型:consumer实例都属于不同group,这样kafka消息会被广播到所有consumer实例上。

    消费者组好处:consumer group用于实现高伸缩性、高容错性的consumer机制。组内多个consumer实例可以同时读取kafka消息,而且一旦有某个consumer挂了,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer来负责,从而保证整个group可以继续操作,不会丢失数据,这个过程被称为重平衡rebalance。

    kafka在为consumer group成员分配分区时可以做到公平的分配。

    由于目前kafka只提供单个分区内的消息顺序,而不会维护全局的消息顺序,因此如果用户要实现topic全局的消息读取顺序,就只能通过让每个consumer group下只包含一个consumer实例的方式来间接实现。

    位移offset

    kafka让consumer group保存offset,保存成了一个长整型数据就行,同时kafka consumer还引入了检查点机制checkpointing定期对offset进行持久化,从而简化了应答机制的实现。

    kafka consumer在内部使用一个map来保存其订阅topic所属分区的offset,key是group.id、topic和分区的元组,value就是位移值。

    consumer把位移提交到kafka的一个内部topic(__consumer_offsets)上,通常不能直接操作该topic,特别注意不要擅自删除或搬移该topic的日志文件。

    订阅topic

    1.消费者组订阅topic列表

    consumer.subscribe(Arrays.asList("topic1","topic2","topic3"))

    2.单独消费者订阅topic列表

    val tp1:TopicPartition = new TopicPartition("topic1",0)

    val tp2:TopicPartition = new TopicPartition("topic2",1)

    consumer.assign(Arrays.asList(tp1,tp2))

    -如果发生多次assign调用,最后一次assign调用的分配生效,之前的都会被覆盖掉。

    -assign和subscribe一定不要混用,即不能在一个consumer应用中同时使用consumer group和独立consumer。

    -独立consumer使用情况:

    (1)进程自己维护分区的状态,它就可以固定消费某些分区而不用担心消费状态丢失的问题。

    (2)进程本身就是高可用且能够自动重启恢复错误(比如使用YARN和Mesos等容器调度框架),就不需要让kafka来帮它完成错误检测和状态恢复。

    3.基于正则表达式订阅topic

    -enable.auto.commit=true

    consumer.subscribe(Pattern.compile("kafka-.*"),new NoOpConsumerRebalanceListener())

    -enable.auto.commit=false

    consumer.subscribe(Pattern.compile("kafka-.*"),new ConsumerRebalanceListener()..)

    ConsumerRebalanceListener是一个回调接口,用户需要通过实现这个接口来实现consumer分区分配方案发生变更时的逻辑。

    消息轮询

    -kafka采用类似于Linux I/O模型的poll或select等,使用一个线程来同时管理多个socket连接,即同时与多个broker通信实现消息的并行读取。

    -一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调、消费者组的rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行。这样用户可以很容易使用一个线程来管理所有的consumer I/O操作。

    -新版本Java consumer是一个双线程的Java进程:创建KafkaConsumer的线程被称为用户主线程,同时consumer在后台会创建一个心跳线程,该线程被称为后台心跳线程。KafkaConsumer的poll方法在用户主线程中运行。

    poll方法根据当前consumer的消费位移返回消息集合。poll方法中有个超时的参数,为了让consumer有机会在等待kafka消息的同时还能够定期执行其他任务。

    poll方法返回满足以下任意一个条件即可返回:

    -要么获取了足够多的可用数据

    -要么等待时间超过了指定的超时设置

    poll的使用方法:

    -consumer需要定期执行其他子任务:推荐poll(较小超时时间) + 运行标识布尔变量while(true)的方式

    -consumer不需要定期执行子任务:推荐poll(MAX_VALUE) + 捕获WakeupException的方式

    poll(MAX_VALUE)让consumer程序在未获取到足够多数据时无限等待,然后通过捕获WakeupException异常来判断consumer是否结束。

    使用这种方式调用poll,那么需要在另一个线程中调用consumer.wakeup()方法来触发consumer的关闭。

    KafkaConsumer不是线程安全的,但是有一个例外:用户可以安全地在另一个线程中调用consumer.wakeup()。注意,只有wakeup方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用。

    位移管理

    consumer会在kafka集群的所有broker中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等。

    当consumer运行了一段时间之后,它必须要提交自己的位移值。consumer提交位移的主要机制是通过向所属的coordinator发送位移提交请求来实现的。每个位移提交请求都会往内部topic(__consumer_offsets)对应分区上追加写入一条消息。

    消息的key是group.id、topic和分区的元组,value就是位移值。

    -自动提交

    使用方法:默认不用配置,或者显示配置enable.auto.commit=true,用auto.commit.interval.ms参数控制自动提交的间隔。

    使用场景:对消息交付语义无需求,容忍一定的消息丢失。

    -手动提交

    使用方法:设置enable.auto.commmit=false;手动调用KafkaConsumer.commitSync或KafkaConsumer.commitAsync提交位移

    使用场景:消息处理逻辑重,不允许消息丢失,至少要求“最少一次”处理语义

    commitSync:同步手动提交,用户程序会等待位移提交结束才执行下一条语句命令。

    commitAsync:异步手动提交,就是异步非阻塞,consumer在后续poll调用时轮询该位移提交的结果。

    commitSync和commitAsync都有带参数的重载方法,目的是实现更加细粒度化的位移提交策略,指定一个Map显示地告诉kafka为哪些分区提交位移,consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset + 1)))。

    提交的位移一定是consumer下一条待读消息的位移。

    重平衡rebalance

    rebalance本质上是一组协议,它规定了一个consumer group是如何达成一致来分配订阅topic的所有分区的。coordinator负责对组执行rebalance操作。

    组rebalance触发的条件,满足其一即可:

    1.组成员发生变更,比如新consumer加入组,或已有consumer主动离开组,再或是已有consumer崩溃时则触rebalance。

    2.组订阅topic数发生变更,比如使用基于正则表达式的订阅,当匹配正则表达式的新topic被创建时则会触发rebalance。

    3.组订阅topic的分区数发生变更,比如使用命令行脚本增加了订阅topic的分区数。

    consumer崩溃的情况,有可能是consumer进程“挂掉”或consumer进程所在的机器宕机,也有可能是consumer无法在指定的时间内完成消息的处理。

    由于目前一次rebalance操作的开销很大,生产环境中用户一定要结合自身业务特点仔细调优consumer参数:request.timeout.ms、max.poll.records和max.poll.interval.ms,以避免不必要的rebalance出现。

    kafka新版本consumer默认提供了3种分配策略,分别是range策略、round-robin策略和sticky策略。

    -range策略:基于范围的思想,将单个topic的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个consumer。

    -round-robin策略:把所有topic的所有分区顺序排开,然后轮询式地分配给各个consumer。

    -sticky策略:有效地避免了上述两种策略完全无视历史分配方案的缺陷,采用了“有黏性”的策略对所有consumer实例进行分配,可以规避极端情况下的数据倾斜而且在两次rebalance间最大限度地维持了之前的分配方案。

    如果group下所有consumer实例的订阅是相同的,那么使用round-robin会带来更公平的分配方案,否则使用range策略的效果更好。

    新版本consumer默认的分配策略是range,用户根据consumer参数partition.assingment.strategy来进行配置。另外kafka支持自定义的分配策略,用户可以创建自己的consumer分配器assignor。

    rebalance generation:用于标识某次rebalance,每个consumer group进行rebalance后,generation就会加1,表示group进入一个新的版本,generation从0开始。

    consumer group可以执行任意次rebalance,generation是为了防止无效offset提交,延迟的offset提交携带的是旧的generation信息,这次提交就会被consumer group拒绝。

    rebalance监听器:最常见的用法是手动提交位移到第三方存储(比如数据库中)以及在rebalance前后执行一些必要的审计操作。有一个主要的接口回调类ConsumerRebalanceListener,里面就两个方法onParitionsRevoked和onPartitionAssigned。在coordinator开启新一轮rebalance前onParitionsRevoked方法会被调用,而rebalance完成后会调用onPartitionAssigned方法。

    使用rebalance监听器的前提是用户使用consumer group。如果使用的是独立consumer或是直接手动分配分区,那么rebalance监听器是无效的。

    多线程消费---两种实现方式

      方法1.每个线程维护专属KafkaConsumer 方法2.全局consumer+多worker线程
    offset提交方式 自动提交 手动提交
    优点

    实现简单,速度快,因为没有线程间交互开销,

    方便位移管理,易于维护

    分区间的消息消费顺序

    消息获取与处理解耦,

    可独立扩展consumer数和worker数,

    伸缩性好

    缺点

    socket连接开销大;

    consumer数受限于topic分区数,扩展性差;

    broker端处理负载高,因为发往broker的请求数多;

    rebalance可能性增大

    实现负载;

    难于维护分区内的消息顺序;

    处理链路变长,导致位移管理困难;

    worker线程异常可能导致消费数据丢失

    broker端设计架构

    kafka的broker负责持久化producer端发送的消息,同时还为consumer端提供消息,就是一个服务器。

    1.消息设计

     -消息批次:RecordBatch

    -消息

    2.集群管理

    kafka是通过依赖Apache ZooKeeper实现自动化的服务发现与成员管理。

    每当一个broker启动时,它会将自己注册到ZooKeeper下的一个节点。

    kafka利用ZooKeeper临时节点来管理broker生命周期的。

    kafka管理集群及其成员的主要流程:

    broker启动时在ZooKeeper中创建对应的临时节点,同时还会创建一个监听器listener监听该临时节点的状态;

    一旦broker启动后,监听器会自动同步整个集群信息到该broker上;

    而一旦broker崩溃后,它与ZooKeeper的会话就会失效,导致临时节点被删除,监听器被触发,然后处理broker崩溃的后续事宜。

    3.副本与ISR

    一个kafka分区本质上就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统高可用性。这些备份在kafka中被称为副本replica。

    kafka把分区的所有副本均匀地分配到所有broker上,并从这些副本中挑选一个作为leader副本对外提供服务,而其他副本被称为follower副本,只能被动地向leader副本请求数据,从而保持与leader副本的同步。

    leader副本:响应clients端读写请求的副本。

    follower副本:被动地备份leader副本上的数据,不能响应clients端的读写请求。

    ISR副本集合:包含leader副本和所有与leader副本保持同步的follower副本。

    ISR:kafka集群动态维护的一组同步副本集合in-sync replicas。ISR包括目前的leader。每个topic分区都有自己的ISR列表,只有ISR中的副本才有竞选leader的资格。producer写入的一条kafka消息只有被ISR中的所有副本都接收到,才被视为“已提交”状态。

    副本中起始位移:表示副本当前所含第一条消息的offset。

    副本中高水印值HW:表示副本最新一条已提交消息的位移。

    副本中日志末端位移LEO:副本日志中下一条待写入消息的offset。     

    4.水印和leader epoch

    水印:也称为高水印或高水位,通常被用在流式处理领域(Apache Storm,Apache Flink,Apache Spark等),以表征元素或事件在基于时间层面上的进度。流式系统保证在水印t时刻,创建时间t1<=t的所有事件都已经达到或被观测到。

    kafka中水印的概念与时间无关,与位置信息有关,表示的就是位置信息,即位移offset。

     kafka 0.11.0.0引入了leader epoch来取代了HW值。每个leader端多开辟一段内存区域专门保存leader的epoch信息。

    leader epoch实际上是一对值(epoch,offset)。

    epoch表示leader的版本号,从0开始,当leader变更过一次时,epoch就会加1,而offset对应于该epoch版本的leader写入第一条消息的位移。

    每个leader broker中会保存这样一个缓存,并定期写入一个检查点文件中。

    而每次副本重新成为leader时会查询这部分缓存,获取对应leader版本的位移,这样就不会发生数据不一致和丢失的情况。

     5.kafka日志

    kafka会将消息和一些必要的元数据信息打包在一起封装成一个record写入日志,record就是消息集合或消息batch。

    只能按照时间顺序在日志尾部追加写入记录record。

    每条记录都会被分配一个唯一的且顺序增加的记录号作为定位该消息的唯一标识-位移信息。

    kafka的日志设计都是以分区为单位的,即每个分区都有它自己的日志,该日志被称为分区日志partition log。

    分区日志下有3组日志段:.log文件就是日志段文件,.index和.timeindex文件都是与日志段对应的索引文件。

    kafka使用该文件第一条记录对应的offset来命名此.log文件。

    日志切分log rolling:kafka每个日志段文件是有上限大小的,由broker端参数log.segment.bytes控制,默认就是1GB大小。

    因此,当日志段文件填满记录后,kafka会自动创建一组新的日志段文件和索引文件。

    .index文件:称为位移索引文件,帮助broker更快地定位记录所在的物理文件位置。

    .timeindex文件:称为时间戳索引文件,根据给定的时间戳查找对应的位移信息。

     日志留存:kafka会定期清除日志,删除符合清除策略的日志段文件和对应的两个索引文件。

    -基于时间的留存策略:默认会清除7天前的日志段数据(包括索引文件)。

    -基于大小的留存策略。

    日志清除是一个异步过程,kafka broker启动后会创建单独的线程处理日志清除事宜。

    基于时间的清除策略中,会计算当前时间戳与日志段首条消息的时间戳之差作为衡量日志段是否留存的依据。

    如果第一条消息没有时间戳信息,kafka才会使用最近修改时间的属性。

    log compaction:日志压实,确保kafka topic每个分区下的每条具有相同key的消息都至少保存最新value的消息。

    经典的使用场景:consumer使用__consumer_offsets内部topic来保存位移信息。

  • 相关阅读:
    让tomcat启动时,自动加载你的项目
    ssh整合 小例子
    hibernate入门(二)
    java引用问题(—)
    hibernate入门(-)
    IOC入门1
    百度知道回答的依赖注入
    spring
    ibatis 优点,未完版
    Data Structure Array: Sort elements by frequency
  • 原文地址:https://www.cnblogs.com/xl717/p/11684500.html
Copyright © 2011-2022 走看看