zoukankan      html  css  js  c++  java
  • 初识kafka

    一.kafka简介

    1. ApacheKafka®是一个分布式流媒体平台,是消息中间件的一种。

    栗子:生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
    鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。
    消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
     
    名词理解
    • producer:生产者,就是它来生产“鸡蛋”的。
    • consumer:消费者,生出的“鸡蛋”它来消费。
    • topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
    • broker:就是篮子了。

    从技术角度,topic标签实际就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。

     2. 流媒体平台有三个关键功能:

    • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
    • 以容错的持久方式存储记录流。
    • 记录发生时处理流。

    3. Kafka通常用于两大类应用:

    • 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
    • 构建转换或响应数据流的实时流应用程序

    4. kafka的几个概念:

    • Kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
    • Kafka集群以称为主题的类别存储记录
    • 每条记录由一个键,一个值和一个时间戳组成。

    5. kafka有四个核心API:

    • 应用程序使用 Producer API 发布消息到1个或多个topic(主题)。
    • 应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
    • 应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
    • Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。

    6.Kafka所使用的基本术语:

    Topic

    Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).

    Producer

    发布消息的对象称之为主题生产者(Kafka topic producer)

    Consumer

    订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)

    Broker

    已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

    7.主题和日志

    主题是发布记录的类别或订阅源名称。Kafka的主题总是多用户; 也就是说,一个主题可以有零个,一个或多个消费者订阅写入它的数据。

    对于每个主题,Kafka群集都维护一个分区日志,如下所示:

    每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都被分配一个称为偏移的顺序ID号,它唯一地标识分区中的每个记录。

    Kafka集群持久地保留所有已发布的记录 - 无论它们是否已被消耗 - 使用可配置的保留期。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。

    事实上,基于每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置。这种偏移由消费者控制:通常消费者在读取记录时会线性地提高其偏移量,但事实上,由于消费者控制位置,它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。

    这些功能组合意味着Kafka消费者非常便宜 - 他们可以来来往往对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具“拖尾”任何主题的内容,而无需更改任何现有使用者所消耗的内容。

    日志中的分区有多种用途。首先,它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们充当了并行性的单位 - 更多的是在一点上。

    8.生产者

    生产者将数据发布到他们选择的主题。生产者负责选择分配给主题中哪个分区的记录。这可以通过循环方式完成,只是为了平衡负载,或者可以根据一些语义分区功能(例如基于记录中的某些键)来完成。更多关于在一秒钟内使用分区的信息!

    9.消费者

    消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器

    如果所有使用者实例具有相同的使用者组,则记录将有效地在使用者实例上进行负载平衡。

    如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程。

    两个服务器Kafka群集,托管四个分区(P0-P3),包含两个使用者组。消费者组A有两个消费者实例,B组有四个消费者实例。

    然而,更常见的是,我们发现主题具有少量的消费者群体,每个“逻辑订户”一个。每个组由许多用于可伸缩性和容错的消费者实例组成。这只不过是发布 - 订阅语义,其中订阅者是消费者群集而不是单个进程。

    在Kafka中实现消费的方式是通过在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分配的“公平份额”的独占消费者。维护组中成员资格的过程由Kafka协议动态处理。如果新实例加入该组,他们将从该组的其他成员接管一些分区; 如果实例死亡,其分区将分发给其余实例。

    Kafka仅提供区内记录的总订单,而不是主题中不同分区之间的记录对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果您需要对记录进行总订单,则可以使用仅包含一个分区的主题来实现,但这将意味着每个使用者组只有一个使用者进程。

    10.卡夫卡作为消息系统

    Kafka的流概念与传统的企业邮件系统相比如何?

    消息传统上有两种模型:排队发布 - 订阅在队列中,消费者池可以从服务器读取并且每个记录转到其中一个; 在发布 - 订阅中,记录被广播给所有消费者。这两种模型中的每一种都有优点和缺点。排队的优势在于它允许您在多个消费者实例上划分数据处理,从而可以扩展您的处理。不幸的是,一旦一个进程读取它已经消失的数据,队列就不是​​多用户。发布 - 订阅允许您将数据广播到多个进程,但由于每条消息都发送给每个订阅者,因此无法进行扩展处理。

    卡夫卡的消费者群体概念概括了这两个概念。与队列一样,使用者组允许您将处理划分为一组进程(使用者组的成员)。与发布 - 订阅一样,Kafka允许您向多个消费者组广播消息。

    Kafka模型的优势在于每个主题都具有这些属性 - 它可以扩展处理并且也是多用户 - 不需要选择其中一个。

    与传统的消息系统相比,Kafka具有更强的订购保证。

    传统队列在服务器上按顺序保留记录,如果多个消费者从队列中消耗,则服务器按照存储顺序分发记录。但是,虽然服务器按顺序分发记录,但是记录是异步传递给消费者的,因此它们可能会在不同的消费者上无序传送。这实际上意味着在存在并行消耗的情况下丢失记录的顺序。消息传递系统通常通过具有“独占消费者”的概念来解决这个问题,该概念只允许一个进程从队列中消耗,但这当然意味着处理中没有并行性。

    卡夫卡做得更好。通过在主题中具有并行性概念 - 分区 - ,Kafka能够在消费者流程池中提供订购保证和负载平衡。这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者使用。通过这样做,我们确保使用者是该分区的唯一读者并按顺序使用数据。由于有许多分区,这仍然可以平衡许多消费者实例的负载。但请注意,消费者组中的消费者实例不能超过分区。

    11.Kafka作为存储系统

    任何允许发布与消费它们分离的消息的消息队列实际上充当了正在进行的消息的存储系统。Kafka的不同之处在于它是一个非常好的存储系统。

    写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也保证写入仍然存在。

    磁盘结构Kafka很好地使用了规模 - 无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行相同的操作。

    由于认真对待存储并允许客户端控制其读取位置,您可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。

    12.Kafka用于流处理

    仅仅读取,写入和存储数据流是不够的,目的是实现流的实时处理。

    在Kafka中,流处理器是指从输入主题获取连续数据流,对此输入执行某些处理以及生成连续数据流以输出主题的任何内容。

    例如,零售应用程序可能会接收销售和发货的输入流,并输出重新排序流和根据此数据计算的价格调整。

    可以使用生产者和消费者API直接进行简单处理。但是,对于更复杂的转换,Kafka提供了完全集成的Streams API这允许构建执行非平凡处理的应用程序,这些应用程序可以计算流的聚合或将流连接在一起。

    此工具有助于解决此类应用程序面临的难题:处理无序数据,在代码更改时重新处理输入,执行有状态计算等。

    流API构建在Kafka提供的核心原语上:它使用生产者和消费者API进行输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。

    二.常用命令

    管理

    ## 创建主题(4个分区,2个副本)
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
    

      

    查询

    ## 查询集群描述
    bin/kafka-topics.sh --describe --zookeeper 
    
    ## 消费者列表查询
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    
    ## 新消费者列表查询(支持0.9版本+)
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
    
    ## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
    
    ## 显示某个消费组的消费详情(支持0.9版本+)
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
    

      

    发送和消费

    ## 生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    ## 消费者
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
    
    ## 新生产者(支持0.9版本+)
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
    
    ## 新消费者(支持0.9版本+)
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
    
    ## 高级点的用法
    bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10
    

      

    平衡leader

    bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
    

      

    kafka自带压测命令

    bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
    

      

    增加副本

    1. 创建规则json

      cat > increase-replication-factor.json <<EOF
      {"version":1, "partitions":[
      {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
      {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
      }
      EOF
      

        

    2. 执行

      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
      

        

    3. 验证

      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.jso
    来源:http://kafka.apache.org/intro.html#intro_topics和https://www.orchome.com/454
  • 相关阅读:
    s s r 多用户 简单配置
    iptables vsftp timeout
    透明控件的通用解决方案
    一个带有可选自定义框架的透明对话框类
    把你的框架窗口一个影子
    WinForms形成皮肤
    WPF加载启动画面
    酷,半透明和形状对话框与标准的控制Windows 2000及以上
    在MFC应用程序中创建web风格的GUI
    画在WinForms控制
  • 原文地址:https://www.cnblogs.com/mlan/p/11084526.html
Copyright © 2011-2022 走看看