zoukankan      html  css  js  c++  java
  • Kafka作为分布式消息系统的系统解析

    Kafka概述

    Apache Kafka由Scala和Java编写,基于生产者和消费者模型作为开源的分布式发布订阅消息系统。它提供了类似于JMS的特性,但设计上又有很大区别,它不是JMS规范的实现,如Kafka允许多个消费者主动拉取数据,而在JMS中只有点对点模式消费者才会主动拉取数据。

    Kafka对消息保存时根据topic进行归类,发送消息者称为producer,消息接收者称为consumer。Kafka集群由多个Kafka实例组成,每个实例称为broker。并且Kafka集群基于zookeeper保存一些meta信息,来保证系统的高可用性。

    生产者可以直接把数据传递给broker,broker通过zookeeper进行leader和follower的选举管理;消费者可以通过zookeeper保存读取的位置offset以及读取的topic的分区信息。这样做有以下几个好处:

    1. 生产者和消费者的负载解耦

    2. 消费者可以按照自己的“能力”拉取数据

    3. 消费者可以自定义消费数量

    Kafka与传统消息系统相比,有以下不同:

    1. Kafka是分布式的,易于水平扩展

    2. 同时为发布和订阅提供高吞吐量

    3. 支持多订阅者,当失败时能自动对消费者进行rebalance

    4. 将消息持久化到磁盘,因此可用于批量消费,例如ETL以及实时应用程序

    Kafka中的重要概念

    名称

    解释

    broker

    Kafka集群中的实例进程,负责数据存储。在Kafka集群中每个broker都有一个唯一的brokerId。通过broker来接受producer和consumer的请求,并把消息持久化到磁盘。每个Kafka集群中会选举出一个broker来担任Controller,负责处理分区的leader选举,协调分区迁移等工作

    topic

    Kafka根据topic对消息进行归类(逻辑划分),发布到Kafka集群的每条消息都需要指定一个topic。落到磁盘上对应的是partition目录,partition目录中有多个segement组合(后缀为index、log的文件)。一个topic对应一个或多个partition,一个partition对应多个segment组合

    producer

    向broker发送消息的生产者。负责数据生产和数据分发。生产者代码可以集成到任务系统中。

    数据分发策略默认为defaultPartition  Utils.abs(key.hashCode)%numPartitions

    consumer

    从broker读取消息的消费者【实际上consumer是通过与zookeeper通信获取broker地址进行消息消费】

    ConsumerGroup

    数据消费者组,ConsumerGroup(以下简称CG)可以有多个。可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据(一个topic的多个partition),组员之间不能重复消费

    partition

    分区,一个topic可以分为多个partition(分布在多个broker上,实现扩展性),每个partition可以设置多个副本,会从多个副本中选取出一个leader负责读写操作。每个partition是一个有序的队列(partition中的每条消息都会被分配一个有序的id(offset)),Kafka只保证按每个partition内部有序并且被顺序消费,不保证一个topic的整体(多个partition间)的顺序

    offset

    每条消息在文件中的偏移量。Kafka的存储文件都是按照offset.index来命名,方便查找

    zookeeper

    保存meta信息,管理集群配置,以及在CG发生变化时进行rebalance

    Replication

    Kafka支持以partition为单位对消息进行冗余备份,每个partition都可以配置至少1个Replication(副本数包括本身)

    leader partition

    每个Replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他Replicas从leader处把数据更新同步到本地,过程类似MySQL中的Binlog同步。

    ISR(In-Sync Replica)

    Replicas的一个子集,表示目前"活着的"且与leader能够保持"联系"的Replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的Replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个partition都有它自己独立的ISR。

    Kafka中的广播和单播

    每个consumer属于一个特定的CG,一条消息可以发送到多个不同的CG,但是一个CG中只能有一个consumer能够消费该消息。目的:实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)。一个topic可以有多个CG。topic的消息会复制(逻辑概念)到所有的CG,但每个partition只会把消息发给该CG中的一个consumer。

    【实现广播】每个consumer有一个独立的CG

    【实现单播】所有的consumer在同一个CG

    用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

     

    Kafka消息的分发

    1. producer客户端负责消息的分发

    1)Kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含集群中存活的servers列表、partitions leader列表等信息

    2)当producer获取到metadata信息之后,producer将会和topic下所有partition leader保持socket连接

    3)消息由producer直接通过socket发送到broker,中间不会经过任何"路由层",消息被路由到哪个partition上由producer通过一些策略如随机、轮巡等决定

    如果一个topic中有多个partition,那么在producer端实现"消息均衡分发"是非常必要的。在producer端的配置文件中,开发者可以指定partition路由的方式,具体流程:

    1)连接broker-list中任意一台broker服务器

    2)发送数据时,需要知道topic对应的partition个数及leader partition所在节点。这些信息由broker提供,每一个broker都能提供一份元数据信息(如哪些broker是存活的,哪个topic有多少分区,哪个分区是leader)

    3)数据生产,数据发送到哪个partition的leader由producer代码决定

    4)数据通过socket连接,直接发送到partition所在的broker

    2. producer消息发送的应答机制

    设置发送数据是否需要服务端的反馈,由参数request.required.acks的值决定:

    0: producer不会等待broker发送ack。最低延迟,持久化保证弱,当server挂掉时会丢失数据

    1: 当leader接收到消息之后发送ack。当前leader接收到数据后,producer会得到一个ack,更好的持久性,因为在server确认请求成功后,client才会返回。如果数据刚写到leader还没来得及复制leader就挂了,消息可能会丢失

    -1: 当所有的follower都同步消息成功后发送ack。最好的持久性,只要有一个replica存活,数据就不会丢失。但相对延迟高

    3. 分发策略

    默认分发策略:def partition(key: T, numPartitions: Int): Int = {

        Utils.abs(key.hashCode) % numPartitions}。

    其他策略:轮询、随机等。

    consumer与topic关系

    通常情况下,一个消费者组有多个consumer,并且一个consumer只会属于一个消费者组。这样不仅可以提高topic中消息的并发消费能力,还能提高"故障容错"。如消费组中的某个consumer挂掉,那么它消费的partition将会由同组内其他的consumer自动接管。

    一个CG中所有的consumer将会交错的消费整个topic,每个消费组中consumer消息消费互相独立,可以认为一个消费者组就是一个"订阅"者。

    注意:对于topic中的一条特定的消息,只会被订阅此topic的每个消费者组中的其中一个consumer消费。同时,Kafka的设计原理决定,对于一个topic,同一个消费者组中如果有多于partition个数的consumer,则意味着某些consumer将无法消费消息。

    consumer负载均衡

    最好是一个partition对应一个consumer。如果consumer数量过多,必然有空闲的consumer。

    当一个消费者组中,有consumer加入或者离开时,会触发partition消费的rebalance。均衡的最终目的为了提升topic的并发消费能力,步骤如下:

    比如一个topic有4个分区:P0、P1、P2、P3,一个CG中有C1、C2两个consumer。

    首先根据partition索引号对partition排序:P0、P1、P2、P3,再根据consumer的id排序:C0、C1

    计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

    然后依次分配partition: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

    Kafka文件存储机制

    1. 文件存储基本结构

    在Kafka文件存储中,同一个topic下有一个或多个不同partition,每个partition为一个目录,partition命名规则为topic名称+有序序号,第一个partition序号从0开始,序号最大值为partition数量减1。

    每个partition相当于一个巨型文件被平均分配到多个大小相等segment段数据文件中。但每个段segment file消息数量不一定相等,这种特性方便老的segment file快速被删除即方便已被消费的消息的清理,提高磁盘利用率。

    segment文件生命周期由服务端配置参数(log.segment.bytes:当segment文件达到多大时滚动生成一个新的segment文件,log.roll.{ms,hours}:滚动生成新的segment的时间即使没有达到设置的segment文件最大值等若干参数)决定。 

    segment的意义:当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重影响。通过参数设置segment可以指定保留多长时间的数据,及时清理已经被消费的消息,提高磁盘利用率,目前默认保存7天数据。

    2. partition segment

    1)segment file组成

    由2大部分组成,分别为index file和data file,两个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件。数值大小为64位,20位数字字符长度,没有数字用0填充,如下:

    2)segment 文件命名规则

    partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充

    3)索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址

    4)segment data file由许多message组成,物理结构如下:

    关键字

    解释说明

    8 byte offset

    在partition内每条消息都有一个有序的id号:offset,它可以唯一确定每条消息在partition内的位置。即offset表示partition的第多少个message

    4 byte message size

    message大小

    4 byte CRC32

    用crc32校验message

    1 byte "magic"

    表示本次发布Kafka服务程序协议版本号

    1 byte "attributes"

    表示为独立版本、或标识压缩类型、或编码类型

    4 byte key length

    表示key的长度,当key为-1时,K byte key字段不填

    value bytes payload

    表示实际消息数据

       

    Kafka查找message

    已知offset查找相应的message,需要通过下面2个步骤查找:

    1. 查找segment file

    00000000000000000000.index表示最开始的文件,起始偏移量为0

    00000000000000000099.index的消息量起始偏移量为100=99+1

    00000000000000000999.index的起始偏移量为1000=999+1

    其他后续文件依次类推。以起始偏移量命名并排序这些文件,只要根据offset按照"二分查找"文件列表,就可以快速定位到具体文件。

    2. 通过segment file查找message

    根据offset,依次定位index元数据物理位置和log的物理偏移位置,然后再在log中顺序查找直至找到对应offset位置即可。


    关注微信公众号:大数据学习与分享,获取更对技术干货

  • 相关阅读:
    了解 NoSQL 的必读资料
    关于什么时候用assert(断言)的思考
    这次见到了一些大侠
    NetBeans 时事通讯(刊号 # 87 Jan 12, 2010)
    动态链接库dll,静态链接库lib, 导入库lib
    新女性十得 写得了代码,查得出异常
    记录系统乱谈
    新女性十得 写得了代码,查得出异常
    fullpage.js禁止滚动
    RunningMapReduceExampleTFIDF hadoopclusternet This document describes how to run the TFIDF MapReduce example against ascii books. This project is for those who wants to experiment hadoop as a skunkworks in a small cluster (110 nodes) Google Pro
  • 原文地址:https://www.cnblogs.com/bigdatalearnshare/p/14014976.html
Copyright © 2011-2022 走看看