zoukankan      html  css  js  c++  java
  • Kafka——相关概念及核心配置说明

    1. Kafka简介

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、

    storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

    Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。

    Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

    Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。Kafka非常快,处理速率可达2百万写/秒。

    Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存;这使得将数据从页面缓存传输到网络套接字非常有效。

    2. Kafka特性
     
    - 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
    - 可扩展性:kafka集群支持热扩展
    - 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
    - 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
    - 高并发:支持数千个客户端同时读写
     
    3. Kafka使用场景
     
    - 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
    - 消息系统:解耦和生产者和消费者、缓存消息等。
    - 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
    - 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
    - 流式处理:比如spark streaming和storm
    - 事件源

    4. 概念

    (1)Topics(主题)

    属于特定类别的消息流称为主题。数据存储在主题中。一般一个Topic会被多个Consumer订阅。

    (2)Partition(分区)

    每个Topic可能有许多分区,每个分区是一个有序的、记录,不断追加一个结构化的提交日志不变的序列。每个分区中的记录都分配了一个连续的ID号,称为唯一地标识分区中每个记录的偏移量。可以这样认为,Partition是物理的概念,每个Partition相当于一个文件夹;而Topic是逻辑的概念,Producer和Consumer只要关心各自推送和订阅的Topic,无需关心整条小心存于集群的那个Broker。

    (3)Partition offset(分区偏移)

    每个分区消息具有称为 offset 的唯一序列标识。

    (4)Replicas of Partition(分区备份)

    副本只是一个分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。

    (5)Segment(分区)

    Segment对应于2个文件(1个索引文件,1个数据文件)。一个Partition对应于一个文件夹。逻辑上一个Partition可以包含无穷多个Segment。数据清理时,旧的Segment将直接被删除。

    (6)Brokers(缓存代理)

    缓存代理是负责维护发布数据的简单系统。实际使用中,每个Kafka的服务实例就是一个Broker。

    a) 假设在一个主题和N个代理中有N个分区,每个代理将有一个分区。

    b) 假设在一个主题中有N个分区并且有N+ M个代理(M,N>0),则第一个N代理将具有一个分区,并且下一个M代理将不具有用于该特定主题的任何分区。

    c) 假设在一个主题中有N个分区并且有N-M个代理(M,N>0),每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。

    (7)Kafka Cluster(Kafka集群)

    Kafka网络如果存在多个代理,则该Kafka网络被称为Kafka集群。 可以扩展Kafka集群,无需停机。 这些集群用于管理消息数据的持久性和复制。

    (8)Producers(生产者)

    生产者是发送给一个或多个Kafka主题的消息的发布者。 生产者向Kafka经纪人发送数据。 每当生产者将消息发布给代理时,代理只需将消息附加到最后一个段文件。实际上,该消息将被附加到分区。 生产者还可以向他们选择的分区发送消息。

    (9)Consumers(消费者)

    Consumers从经纪人处读取数据。 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。

    (10)Leader(领导者)

    Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader。

    (11)Follower(追随者)

    跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。

    3. ZooKeeper简介

    Apache Kafka的一个关键依赖是Apache Zookeeper。 Zookeeper是一个分布式配置和同步服务。

    在Kafka网络中,是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。

    由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。

    5.重要配置文件相关说明

    5.1. Zookeeper配置(zoo.cfg)

    Zookeeper是一个集群服务,集群的每个节点都需要zoo.cfg。为了避免出差错,zoo.cfg里没有跟特定节点相关的配置,所以每个节点上的这个zoo.cfg都是一模一样的。

    配置要素:

    (1)clientPort: 客户端连接zookeeper服务的TCP监听端口,默认2181。

    (2)dataDir:存放内存数据结构的snapshot,为节点快速恢复留存。

    (3)dataLogDir:存放顺序日志(WAL)。dataLogDir如果没提供的话使用的则是dataDir。

    一般建议把dataDir和dataLogDir分到不同的磁盘上,这样就可以充分利用磁盘顺序写的特性。

    (4)server.myid: 配置IP,Leader和Follower/Observer交换数据用端口,Zookeeper选举用端口。

    典型示例:

    server.1=127.0.0.1:20881:30881

    server.2=127.0.0.1:20882:30882

    server.3=127.0.0.1:20883:30883

    在上面的例子中,我把三个zookeeper服务放到同一台机器上。server.后面的数字。这个就是myid。

    myid是集群服务的唯一标示,一定要保证myid在整个集群中唯一。设定后ZooKeeper会在dataDir里会放置一个myid文件,用来唯一标识这个服务(里面就一个数字)。zookeeper会根据这个id来取出server.x上的配置。比如当前id为1,则对应着zoo.cfg里的server.1的配置。

    (5)tickTime:时间单位定量。

    例如,tickTime=1000表示在zookeeper里1 tick等于1000 ms,所有其他用到时间的地方都会用多少tick来表示。

    (6)syncLimit:就表示follower与leader的心跳时间。

    例如 syncLimit=2表示心跳时间=2 tick。

    (7)maxClientCnxns:对于一个客户端的连接数限制,默认是60。实际使用中可能需要调整。

    (8)minSessionTimeout:最小Session 超时时间。

    (9)maxSessionTimeout:最大Session 超时时间。

    一般,客户端连接zookeeper时,会设置一个session timeout,如果超过这个时间Client没有与ZooKeeper Server有联系,则这个session会被设置为过期(如果这个session上有临时节点,则会被全部删除,这就是实现集群感知的基础)。但是这个时间不是客户端可以无限制设置的,服务器可以设置这两个参数来限制客户端设置的范围。

    (10)autopurge.purgeInterval:内存数据清理间隔(小时)。

    (11)autopurge.snapRetainCount:清理snapshot时最大文件保留数量。

    由于Client在与ZooKeeper Server交互过程中会产生非常多的日志,而且ZooKeeper Server也会将内存中的数据作为snapshot保存下来,这些数据是不会被自动删除的,这样磁盘中这样的数据就会越来越多。为保证磁盘空间不无限增长,可以通过以上两个参数来设置,让zookeeper自动删除数据。

    注意:删除操作可能会影响zookeeper集群的性能,所以一般会让这个过程在访问低谷的时候进行,但是遗憾的是zookeeper并没有设置在哪个时间点运行的设置,所以有的时候我们会禁用这个自动删除的功能。

    5.2. Kafka配置(XXX.properties)

    (1)broker.id:

    每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响Consumers的消息情况

    (2)log.dirs:

    kafka数据的存放地址,多个地址的话用逗号分割 /data/kafka-logs-1,/data/kafka-logs-2

    (3)port:

    broker server服务端口(默认9092)

    (4)message.max.bytes:

    表示消息体的最大大小,单位是字节(默认6525000)

    (5)num.network.threads :

    broker处理消息的最大线程数,一般情况下不需要去修改(默认4)

    (6)num.io.threads:

    broker处理磁盘IO的线程数,数值应该大于你的硬盘数(默认8)

    (7)background.threads:

    一后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改(默认4)

    (8)queued.max.requests:

    等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。(默认500)

    (9)host.name:

    broker的主机地址,若设置则绑定到该地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

    (10)socket.send.buffer.bytes:

    socket的发送缓冲区,socket的调优参数SO_SNDBUFF(默认100*1024)

    (11)socket.receive.buffer.bytes :

    Socket的接受缓冲区,socket的调优参数SO_RCVBUFF(默认100*1024)

    (12)socket.request.max.bytes :

    Socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖(默认100*1024*1024)

    (13)log.segment.bytes:

    Topic的分区是以一堆Segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖(默认100*1024*1024)

    (14)log.roll.hours :

    这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖(默认24*7)

    (15)log.cleanup.policy:

    日志清理策略选择,选项为delete/compact。主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖(默认delete)

    (16)log.retention.minutes:

    数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据(默认3days)

    (17)log.retention.bytes:

    topic每个分区的最大文件大小,一个topic的大小限制: 分区数*log.retention.bytes。

    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖(默认-1没有大小限制)

    (18)log.retention.check.interval.ms:

    文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略(默认5minutes)

    (19)log.cleaner.enable:是否开启日志压缩(默认false)

    (20)log.cleaner.threads:日志压缩运行的线程数(默认2)

    (21)log.cleaner.io.max.bytes.per.second:日志压缩时候处理的最大大小(默认None)

    (22)log.cleaner.dedupe.buffer.size:日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好。(默认500*1024*1024)

    (23)log.cleaner.io.buffer.size:日志清理时候用到的IO块大小一般不需要修改(默认512*1024)

    (24)log.cleaner.io.buffer.load.factor:日志清理中hash表的扩大因子,一般不需要修改。(默认0.9)

    (25)log.cleaner.backoff.ms:检查是否处罚日志清理的间隔(默认15000)

    (26)log.cleaner.min.cleanable.ratio:

    对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖(默认1day)

    (28)log.index.size.max.bytes:

    对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖(默认10*1024*1024)

    (29)log.index.interval.bytes:

    当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数(默认4096)

    (30)log.flush.interval.messages:

    log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.(默认None)

    (31)log.flush.scheduler.interval.ms:

    检查是否需要固化到硬盘的时间间隔(默认3000)

    (32)log.flush.interval.ms:

    仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.(默认None)

    (33)log.delete.delay.ms:

    文件在索引中清除后保留的时间一般不需要去修改(默认60000)

    (34)log.flush.offset.checkpoint.interval.ms:

    控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改(默认60000)

    (35)auto.create.topics.enable:

    是否允许自动创建topic,若是false,就需要通过命令创建topic(默认true)

    (36)default.replication.factor:

    是否允许自动创建topic,若是false,就需要通过命令创建topic(默认1)

    (37)num.partitions:

    每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖(默认1)

    (38)controller.socket.timeout.ms:

    partition leader与replicas之间通讯时,socket的超时时间(默认30000)

    (39)controller.message.queue.size:

    partition leader与replicas数据同步时,消息的队列尺寸(默认10)

    (40)replica.lag.time.max.ms:

    replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中(默认10000)

    (41)replica.lag.max.messages:

    如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效.

    通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后;如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移到其他follower中。在broker数量较少,或者网络不足的环境中,建议提高此值。(默认4000)

    (42)replica.socket.timeout.ms:

    follower与leader之间的socket超时时间(默认30*1000)

    (43)replica.socket.receive.buffer.bytes:

    leader复制时候的socket缓存大小(默认64*1024)

    (44)replica.fetch.max.bytes:

    replicas每次获取数据的最大大小(默认1024*1024)

    (45)replica.fetch.wait.max.ms:

    replicas同leader之间通信的最大等待时间,失败了会重试(默认500)

    (46)replica.fetch.min.bytes:

    fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件(默认1)

    (47)num.replica.fetchers:

    leader进行复制的线程数,增大这个数值会增加follower的IO(默认1)

    (48)replica.high.watermark.checkpoint.interval.ms:

    每个replica检查是否将最高水位进行固化的频率(默认5000)

    (49)controlled.shutdown.enable:

    是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker(默认false)

    (50)controlled.shutdown.max.retries:

    控制器关闭的尝试次数(默认3)

    (51)controlled.shutdown.retry.backoff.ms:

    每次关闭尝试的时间间隔(默认5000)

    (52)leader.imbalance.per.broker.percentage:

    leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡(默认10)

    (53)leader.imbalance.check.interval.seconds:

    检查leader是否不平衡的时间间隔(默认300)

    (54)offset.metadata.max.bytes:

    客户端保留offset信息的最大空间大小

    (55)zookeeper.connect:

    zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3(默认localhost:2181)

    (56)zookeeper.session.timeout.ms=6000

    ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大(默认)

    (57)zookeeper.connection.timeout.ms:

    ZooKeeper的连接超时时间(默认6000)

    (58)zookeeper.sync.time.ms:

    ZooKeeper集群中leader和follower之间的同步时间(默认2000)

  • 相关阅读:
    Python之利用 gensim的word2vec进行酒店评论+wiki百科语料联合词向量训练
    Python之酒店评论主题提取LDA主题模型
    Python之酒店评论分词、词性标注、TF-IDF、词频统计、词云
    Pycharm使用技巧----Pycharm工程使用anaconda环境
    Python之Pandas 简介与Pandas 读取csv文件及相关操作01
    csv文件用excel打开乱码的解决方案
    apply()和call()的方法
    如何将webstrom本地的代码上传到github上
    通过Ajax方式上传文件,使用FormData进行Ajax请求
    Express bodyParser中间件使用方式
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/11075621.html
Copyright © 2011-2022 走看看