zoukankan      html  css  js  c++  java
  • Kafka 转载

    转载自:https://fangyeqing.github.io/2016/10/28/kafka---%E4%BB%8B%E7%BB%8D/

    kafka---介绍


    Kafka是一种分布式的消息系统。本文基于0.9.0版本,新版kafka加入了流处理组件kafka stream,最新的官方文档又自称分布式流处理平台。

    概念

    • Broker
      Kafka的节点。kafka集群包含一个或多个broker
    • Producer
      消息的生产者。负责发布消息到Kafka broker
    • Consumer
      消息的消费者。每个consumer属于一个特定的consumer group(若不指定group id则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
    • Topic
      消息主题。例如pv日志、click日志、转化日志都可以作为topic。
    • Partition
      topic物理上的分组。每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition是一个有序的队列,对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。在发送一条消息时,生产者可以指定这条消息的key和分区机制来发送到不同的分区。
    • offset
      每个partition中的每条消息被分配的有序id,是消息的唯一标识。每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。

      架构

      image
      producers(生产者)通过网络将不同topic的messages(消息)发送到Kafka 集群,consumers(消费者)在集群订阅自己想要消费的topic。

    一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。  

    topic&&partitions

    对于每个Topic,Kafka会为其维护一个如下图所示的分区的日志文件
    image
    每个partition(分区)是一个有序的、不可修改的消息组成的队列;这些消息是被不断的appended到这个commit log(提交日志文件)上的。在这些patitions之中的每个消息都会被赋予一个叫做offset的顺序id编号,用来在partition之中唯一性的标示这个消息。

    Kafka集群会保存一个时间段内所有被发布出来的信息,无论这个消息是否已经被消费过。

    partition内有序

    Kafka仅仅提供提供partition之内的消息的全局有序,在不同的partition之间不能担保。partition的消息有序性加上可以按照指定的key划分消息的partition,这基本上满足了大部分应用的需求。如果你必须要实现一个全局有序的消息队列,那么可以采用Topic只划分1个partition来实现。但是这就意味着你的每个消费组只能有唯一的一个消费者进程。

    Consumer Group

    每一个consumer实例都属于一个consumer group,每一条消息都会被所有订阅了该topic的consumer group消费。通过group id指定consumer group。相当于同一个consumer group的消费者会瓜分所有的分区,每个consumer会消费一个或多个分区。

    使用high-level consumer时,同一个consumer group里只有一个consumer能消费到该消息。

    因为high level不用client关心offset, 会自动的读zookeeper中该Consumer group的last offset,相当于所有consumer都公用这个offset。当其中一个consumer消费一条消息时,offset就移动到下一条。

    image

    不同形式的消息播发

    订阅模式:每个Consumer都采用不同的group,每一条消息都会发送给所有消费者
    消息队列模式:所有的Consumer在同一个Group里,消费者之间负载均衡

    Producer&&Consumer&&partitions

    新建topic时,通过–partitions 可以设置分区数。可以指定partitions数为broker的整数倍,这样,每个broker会对应相同个数的partitions。

    生产者在生产数据的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪一个partitions中。如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的partitions中,这样就实现了负载均衡和水平扩展。

    Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。其中consumer和partition数量关系如下表所示:

    consumer和partition数量关系消费情况
    小于 至少有一个consumer会消费多个partition的数据
    相等 一个consumer消费一个partition的数据
    大于 部分consumer无法消费该topic下任何一条消息,浪费

    增减consumer,broker,partition会导致rebalance,rebalance后consumer对应的partition会发生变化,在后面的实例中也可以看到。

    测试

    利用kafka中自带的生产者和消费者例子来做个简单的测试。具体步骤在另一篇kakfa部署中。

    kafka集群有3个broker节点。新建一个partitions数量为3的topic。启动一个A终端为生产者,启动B、C、D、E终端为消费者。C、D、E终端为一个consumer group,B为单独的一个consumer group。

    在producer终端输入消息从1-20。可以看到B终端会输出1-20全部消息,图中B所示。而C、D、E终端由于属于同一个Consumer Group,partitions数量等于consumer,每个consumer消费了一个partition里的消息。图中为C、D、E。

    将C终端断开,剩下B、D、E去消费消息。B终端还是会输出1-20全部消息,图中为B1所示。而D、E属于同一个Consumer Group,且consumer数量少于partition数,可以看到D消费了两个partition中的数据,见图中D1所示。
    image

    Replication&&broker节点故障处理

    Replication

    replication策略是基于partition。kafka通过创建topic时可以通过–replication-factor配置partition副本数。配置副本之后,每个partition都有一个唯一的leader,有0个或多个follower。

    所有的读写操作都在leader上完成,leader批量从leader上pull数据。followers从leader消费消息来复制message,就跟普通的consumer消费消息一样。

    一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。

    follower故障处理

    broker是否alive包含两个条件:

    • 一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。
    • 二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。

    leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。

    一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)

    而对于producer而言,它可以选择是否等待消息commit,这可以通过producer的“acks”来设置。默认为acks=all ,这意味着leader将等待所有follower复制完消息。

    leader故障处理

    leader挂掉后,怎样在follower中选举出新的leader?

    Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。

    如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

    • 一致性高:等待ISR中的任一个replica“活”过来,并且选它作为leader。可能会等待比较长的时间
    • 可用性高:选择第一个“活”过来的replica(不一定是ISR中的)作为leader。有可能会丢失数据

    kafka采用第二种方案,可以通过配置unclean.leader.election.enable来关闭这种方案。

    测试

    kafka集群有3个broker节点。具体部署在另一篇kafka部署中。

    做个简单的测试,创建一个3分区的topic,不指定副本数,可以看到默认一个副本,Partition均匀分布在各broker。

    1
    2
    3
    4
    5
    6
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partitions
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partitions
    Topic:test3partitions PartitionCount:3 ReplicationFactor:1 Configs:
    Topic: test3partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1
    Topic: test3partitions Partition: 1 Leader: 2 Replicas: 2 Isr: 2
    Topic: test3partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0

    创建一个3分区2副本的topic,可以看到Replicas和Isr中有1个follower。例如Partitions:0的Leader为broker:1,follower为broker:2,并且2在Isr中,理论上当Leader挂掉之后,2会顶上。

    1
    2
    3
    4
    5
    6
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition2replication --replication-factor 2
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition2replication
    Topic:test3partition2replication PartitionCount:3 ReplicationFactor:2 Configs:
    Topic: test3partition2replication Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
    Topic: test3partition2replication Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
    Topic: test3partition2replication Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

    创建一个3分区2副本的topic,可以看到Replicas和Isr中有2个follower

    1
    2
    3
    4
    5
    6
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition3replication --replication-factor 3
    [fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition3replication
    Topic:test3partition3replication PartitionCount:3 ReplicationFactor:3 Configs:
    Topic: test3partition3replication Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: test3partition3replication Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
    Topic: test3partition3replication Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

    Message delivery guarantees

    目前有下面几种消息确保机制:

    • At most once 消息可能会丢,但绝不会重复传输
    • At least one 消息绝不会丢,但可能会重复传输
    • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

    Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。下面分阶段分析:

    Producer向broker发送消息

    当Producer向broker发送消息,由上述Replication的分析可知,一旦这条消息已经被commit,如果这个topic有多个replication(副本),某个broker挂掉也不会丢失消息。

    Producer发送数据给broker的过程中,如果遇到网络问题而造成通信中断:

    • At least once:Producer就无法判断该条消息是否已经commit,再重复提交时就会是At least once。
    • Exactly once:在以后的版本中producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次。
    • At most once:Producer异步提交来实现At most once

    Consumer从broker消费消息

    当Consumer从broker消费消息时,consumer如果在消费消息时crash:

    • At least once:读完消息先处理再commit消费状态(保存offset)
    • At most once:读完消息先commit消费状态(保存offset)再处理消息
    • Exactly once:需要协调offset和实际操作的输出,目前比较麻烦。离线数据可以做到去重,利用Camus或者Gobbin将kafka topic落地到HDFS,然后做去重即可。其中Camus可以参考我的另一篇博客Camus介绍
  • 相关阅读:
    聊聊关于性能优化和其他(一)
    JavaScript 事件循环及异步原理(完全指北)
    SPA路由机制详解(看不懂不要钱~~)
    Web安全系列(三):XSS 攻击进阶(挖掘漏洞)
    Web安全系列(二):XSS 攻击进阶(初探 XSS Payload)
    浅谈Generator和Promise原理及实现
    Kubernetes 服务目录
    Kubernetes 网络模型
    个人开源贡献记录
    【转载】DTO – 服务实现中的核心数据
  • 原文地址:https://www.cnblogs.com/x-x-736880382/p/11733814.html
Copyright © 2011-2022 走看看