zoukankan      html  css  js  c++  java
  • Kafka 基本知识分享

    目录

    前记

    前一段时间,在项目中用到消息队列,我们选择了 Kafka 作为中间件。

    我负责研究学习,并集成到项目中去,下面就是我在学习中记录的一些知识点,分享共同学习。

    正文

    一、基本术语

    Kafka 

    Broker  节点,每一个安装Kafka的服务器

    Producer 生产者

    Consumer 消费者在 消费者组

    Topic    话题,相当于传统消息系统的queue(队列),可包含多个Partition

    Partition  分区,分为(partition leader,partition follover)

    Replice   副本,partition 副本,这个副本数不能超过Broker的数量

    Leader 

    Zookeeper broker、topic、partition 的守护程序

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

    消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。

    二、Kafka 基本命令(windows 下)

    1、开启zookeeper:.inwindowszookeeper-server-start.bat .configzookeeper.properties

    2、开启kafka服务:.inwindowskafka-server-start.bat .configserver.properties

    3、新建Topic:.inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicTest

    4、查看Topic:.inwindowskafka-topics.bat --list --zookeeper localhost:2181

    5、查看某一个Topic详情:.inwindowskafka-topics.bat --zookeeper localhost:2181 --describe --topic topicTest

    6、给某一个Topic添加Partition:.inwindowskafka-topics.bat --zookeeper localhost:2181 --alter --partitions 2 --topic topicTest

    6、启动客户端:(已淘汰).inwindowskafka-console-consumer.bat --zookeeper localhost:2181 -topic tpoicTest -group groupName (接听的Topic,用户组名)(这两种建的消费者,即使指定相同的组名,实际上面也是两个独立的)

    (新消费者).inwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topicTest --group testGroup

    7、启动生产者:.inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic topicTest

    8、查看消费者列表:.inwindowskafka-consumer-groups.bat --bootstrap-server localhost:9092 --list (把--bootstrap-server 替换为—zookeeper 查看不同消费者)

    9、某一个消费者组详情:.inwindowskafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test-consumer-group

    10、删除消费者组:.inwindowskafka-consumer-groups.bat --bootstrap-server localhost:9092 --delete --group testGroup2 --group testGroup4 –topic topicName

    11、添加分区副本:.inwindowskafka-reassign-partitions.bat –zookeeper localhost:2181 --reassignment-json-file replication.json --execute

    三、易混淆概念

    1、(生产者)在发送消息不需要指定Partition,指定Topic就可以。会平均分配到Topic下的Partition;

    2、(消费者)在消费消息时,要指定Topic,对于接收到的信息,如果没有指定消费者组名,就会接收到这个Topic下的所有Partition消息,如果指定了消费者组名,每个Partition的消息只能有组中的一个消费者消费;

    3、对于消费者组,这个组已经消费了对应的Topic中的Partition ,如果再次启动消费者并指定组名是这个消费者组,那即使设置是从头开始读取也没有用(具体看下面的组详情);

    4、对已有的Topic进行添加副本的时候,要注意几点:(1)在有数据删除的时候,添加副本,由于原数据要删除进行操作,但是又有从不同的borker间进行复制数据,这样就会有问题,我遇到的是报日志写不了,服务开启后自动挂掉;(2)在服务开启状态下进行添加副本,但是在broker间复制时,会有broker间连接问题,重启服务就好了;

    5、在开启消费者时,建议同时写上组名,这样便于管理消费者和组(限命令行,API开发这个配置项是必选);

    6、同一个消费者实例是不能混合自(不指定 partition,消费者组协调分配)动和手动的partition分配(手动制定取的 partition 消息).

    四、Kafka的特性

    - 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

    - 可扩展性:kafka集群支持热扩展(横向)

    - 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

    - 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

    - 高并发:支持数千个客户端同时读写

    五、Kafka的使用场景

    - 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

    - 消息系统:解耦和生产者和消费者、缓存消息等。

    - 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

    - 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

    - 流式处理:比如spark streaming和storm

    - 事件源

    六、Kakfa的设计思想

    Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。

     Consumergroup

    - 各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组。

    - 新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。

    - 而kafka为了保证吞吐量,只允许同一个consumer group下的一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了,大家都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。

    - 一个consumer group下,无论有多少个consumer,这个consumer group一定会把这个topic下所有的partition都消费了。

    - 消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);

    (2)指定"auto.offset.reset"参数的值为earliest;

    auto.offset.reset 值解释:

    https://blog.csdn.net/lishuangzhe7047/article/details/74530417

    earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

    latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

    none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    (这里的 offset 是指,当前组下消费对应Topic的值,查看组详细信息就可以看到)

    Consumer Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offset信息。

     Topic & PartitionTopic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量]  )。

    一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用。

    当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance。

    Partition Replica每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。

    Topic分配partitionpartition replica的算法:(1将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上

    replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)

    - 关于新建生产者、消费者 key、value 问题:是为了选择分区,如下所示:

    1、在初始化的时候,是为了配置key的序列化编码格式,会和config里面的对比,这个是空的就会从config里面取,如果不是空的从config里面去掉这个配置;

    2、在生产或者消费的时候,这个key是如下所示:

    1. 指定了 partition,则直接使用;

    2. 未指定 partition 但指定 key,通过对 key 的 value 进行hash 选出一个 partition

    3. partition 和 key 都未指定,使用轮询选出一个 partition。

    如果key == null,则从sendPartitionPerTopicCache(sendPartitionPerTopicCache的类型是HashMap.empty[String, Int])中获取分区ID,如果找到了就直接用这个分区ID;否则随机去选择一个partitionId,并将partitionId存放到sendPartitionPerTopicCache中去。而且sendPartitionPerTopicCache是每隔topic.metadata.refresh.interval.ms时间才会清空的. 也就是说在key为null的情况下,Kafka并不是每条消息都随机选择一个Partition;而是每隔topic.metadata.refresh.interval.ms才会随机选择一次!不过LinkedIn工程师Guozhang Wang解释到:本来producer在key为null的情况下每条消息都随机选择一个Partition,但后面改成这种伪随机的以此来减少服务器端的sockets数。

    七、Kafka 配置文件设置

    数据删除设置

    log.cleanup.policy=delete启用删除策略
    直接删除,删除后的消息不可恢复。可配置以下两个策略:
    清理超过指定时间清理:

    log.retention.hours=16

    超过指定大小后,删除旧的消息:
    log.retention.bytes=1073741824
    为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

    删除超时数据步骤:

    • 该函数首先获取 topic,根据 topic 获取日志超时时间,该超时时间可以使用配置 log.retention.hours.per.topic 单独指定,如果没有单独指定,则使用统一的 log.retention.hours 配置。
    • 然后扫描所有该日志对应的 Segment 文件,对所有最近修改时间与当前时间差距大于超时时间的日志的 Segment 文件,标记为删除
    • 最后删除标记为删除的 Segment 文件

    八、新消费者

    (来源)https://blog.csdn.net/zoubf/article/details/51701217?locationNum=2

    当Kafka最初创建的时候,它内置了scala版本的producer和consumer客户端.在使用的过程中我们渐渐发现了这些APIs的限制.比如,我们有”high-level”的消费者API,可以支持消费组和故障处理,但是不支持更多更复杂的场景需求. 我们也有一个简单的消费者客户端(SimpleConsumer,即low-level),可以支持自定义的控制,但是需要应用程序自己管理故障和错误处理.所以我们决定重新设计这些客户端,它的目标是要能实现之前使用旧的客户端不容易实现甚至无法实现的场景,还要建立一些API的集合,来支持长时间的拉取消息(译注: 即消费者通过poll方式保持长时间的消息拉取).

    第一阶段是发布在0.8.1中生产者API(Kafka Producer)的重写,最近发布的0.9完成第二阶段新的消费者API(Kafka Consumer).基于新的消费组协调协议(group coordination protocol),新的消费者API带来了以下的优势:

    1、简洁的统一API: 新的消费者结合了旧的API中”simple”和”high-level”消费者客户端两种功能,能够同时提供消费者协调(高级API)和lower-level的访问,来构建自定义的消费策略.

    2、更少的依赖: 新的消费者完全使用java编写,它不再依赖scala运行时环境和zookeeper.在你的项目中可以作为一个轻量级的库

    3、更好的安全性: 0.9版本实现了安全性扩展,目前只支持新的消费者API

    4、新的消费者还添加了一些协议: 管理一组消费者处理进程的故障容忍.之前这部分功能通过java客户端频繁地和zookeeper进行交互.部分复杂的逻辑导致很难使用其他语言构建出完整的客户端.现在新的协议的出现使得这部分非常容易实现,现在已经实现了C的客户端.

    新的消费者使用了消费组协调协议. 对于每个消费组,会选择一个brokers作为消费组的协调者(group coordinator).协调者负责管理消费者组的状态. 它的主要工作是负责协调partition的分配(assignment): 当有新成员加入,旧成员退出,或者topic的metadata发生变化(topic的partitions改变).重新分配partition叫做消费组的平衡(group rebalance)

    每个consumer group保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。

    九、Kafka该怎么用

    1、Kafka对消息的重复、丢失、错误以及顺序没有严格的要求。但是part只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的消费。

    2、broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。同时broker是无状态的,broker不保存消费者的状态,由消费者自己保存。无状态也导致消息的删除成为难题,所以Kafka选择消息保存一定时间后会被删除。

    3、大量的依赖Zookeeper,需要Zookeeper来管理broker与consumer的动态加入与离开。以及消费关系及每个partition的消费信息。(新消费者已经改变这个)

    使用:

    1、应当有一个非常好的运维监控系统,不单单要监控Kafka本身,还要监控Zookeeper。

    2、对消息顺序不依赖,且不是那么实时的系统。

    3、对消息丢失并不那么敏感的系统。

    a)如果你有一堆事件的Firehose (2万+/秒   每个生产者产生  )你需要在“至少一次”的顺序中,通过在线和批量的消费者来交付,但是最重要的是你可以让你的消费者管理你的“光标”在kafka主题上的状态。
    kafka的主要超级力量是,它不像一个队列系统,更像一个循环缓冲区,它可以像你的磁盘上的磁盘那样缩放,这样你就可以重新读取消息。
    b)使用RabbitMQ如果你有消息(20k+/秒),需要以复杂的方式对消费者进行路由

    十、Kafka和RabbitMQ对比

    RabbitMQ:

    1、多一个Exchange(交换器),这个是配置好的路由,用于把消息分发到指定的队列中,这个对应关系就是Binding

    2、Exchange 分四个分发模式:direct、fanout、topic、headers ,Kafka直接指定Topic名

    多个消费者可以订阅同一个队列,这时队列里的数据就会平均分配给各个消费者;Kafka的做法是对Topic进行分区,分区的Partition只能有一个消费者消费,这样就出现了消费者组,组中的每个消费者消费一个Partition;这样这两者就达到了一致的目的

    3、RabbitMQ对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量还在其次。

    4、RabbitMQ 中生产的数据到队列,这之间的绑定会更灵活,可以配置强大的路由(可以实现一个消息对应到多个队列中);Kafka一次只能给一个Topic发送消息,比较有局限;

    5、RabbitMQ 集群是通过配置的,Kafka 是通过Zookeeper 管理的,需要依赖Zookeeper

    十一、Zookeeper

    脚本                            说明

    zkCleanup     清理ZooKeeper历史数据,包括事务日志文件和快照数据文件

    zkCli             ZooKeeper的一个简单客户端

    zkEnv            设置ZooKeeper的环境变量

    zkServer        ZooKeeper服务器的启动、停止和重启脚本

  • 相关阅读:
    Android开发之无线遥控器
    那些有意思的代码
    更改Android编译软件版本(make/gcc/bision)
    ubuntu16.04安装virtualbox5.1失败 gcc:error:unrecognized command line option ‘-fstack-protector-strong’
    Android编译环境折腾记
    BM25相关度打分公式
    javascript作用域
    javascript 中关于call方法的详解。
    JavaScript RegExp 对象
    vue runtime 问题
  • 原文地址:https://www.cnblogs.com/zhurong/p/9581169.html
Copyright © 2011-2022 走看看