zoukankan      html  css  js  c++  java
  • Kafka 0.10 KafkaConsumer流程简述

    ConsumerConfig.scala 储存Consumer的配置

    按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的。

    1.从poll开始

    消费的规则如下:

    • 一个partition只能被同一个ConsumersGroup的一个线程所消费.
    • 线程数小于partition数,某些线程会消费多个partition.
    • 线程数等于partition数,一个线程正好消费一个线程.
    • 当添加消费者线程时,会触发rebalance,partition的分配发送变化.
    • 同一个partition的offset保证消费有序,不同的partition消费不保证顺序.

    Consumers编程的用法:

    private final KafkaConsumer<Long, String> consumer; // 与Kafka进行通信的consumer
    ...
    consumer = new KafkaConsumer<Long, String>(props);
    consumer.subscribe(Collections.singletonList(this.topic));
    ConsumerRecords<Long, String> records = consumer.poll(512);
    ...
    

    consumer,是一个纯粹的单线程程序,后面所讲的所有机制(包括coordinator,rebalance, heartbeat等),都是在这个单线程的poll函数里面完成的。也因此,在consumer的代码内部,没有锁的出现。

    1.1包括的组件

    从KafkaConsumer的构造函数可以看出,KafkaConsumer有以下几个核心部件:

    • Metadata: 存储Topic/Partion与broker的映射关系
    • NetworkClient:网络层 A network client for asynchronous request/response network i/o.
    • ConsumerNetworkClient: Higher level consumer access to the network layer //对NetworkClient的封装,非线程安全
    • ConsumerCoordinator:只是client端的类,只是和服务端的GroupCoordinator通信的介质。(broker端的Coordinator 负责reblance、Offset提交、心跳)
    • SubscriptionState: consumer的Topic、Partition的offset状态维护
    • Fetcher: manage the fetching process with the brokers. //获取消息

    后面会分组件讲解Consumers的工作流程

    1.2 Consumer消费者的工作过程:

    1. 在consumer启动时或者coordinator节点故障转移时,consumer发送ConsumerMetadataRequest给任意一个brokers。在ConsumerMetadataResponse中,它接收对应的Consumer Group所属的Coordinator的位置信息。
    2. Consumer连接Coordinator节点,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化平衡。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前Consumer Group的新的generation编号。这个时候Consumer Group管理已经完成,Consumer就可以开始fetch数据,并为它拥有的partitions提交offsets。
    3. 如果HeartbeatResponse没有错误返回,Consumer会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。

    2 设计

    2.0 MetaData

    见Producer里面的分析。

    补充一下,KafkaConsumer、KafkaProducer都是在构造函数中获取metadata信息,通过调用metadata.update方法来获取信息。

    2.1 coordinator 为什么,做什么

    1.去zookeeper依赖 -- 为什么

    • 在0.9以前的client api中,consumer是要依赖Zookeeper的。因为同一个consumer group中的所有consumer需要进行协同,这与后面要讲的rebalance有关。(ConsumerConnector、KafkaStream、ConsumerIterator) -- package kafka.consumer
    • 0.9之后新的consumer不依赖与Zookeeper,一个consumerGroup内的consumer由Coordinator管理.(KafkaConsumer) -- package org.apache.kafka.clients.consumer

    为什么?后面讲

    提问:为什么在一个group内部,1个parition只能被1个consumer拥有?

    2.coordinator协议/partition分配问题

    给定一个topic,有4个partition: p0, p1, p2, p3, 一个group有3个consumer: c0, c1, c2。

    • 那么,如果按RangeAssignor策略,分配结果是:
      c0: p0, c1: p1, c2: p2, p3
    • 如果按RoundRobinAssignor策略:
      c0: p1, p3, c1: p1, c2: p2
    • partition.assignment.strategy=RangeAssignor,默认值

    (到底是哪种分配状态呢)
    那这整个分配过程是如何进行的呢?见下图所示:
    image

    3步分配过程

    1. 步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。(1个consumer group对应一个coordinattor)

    GroupCoordinatorRequest: GCR,由ConsumerNetworkClient发送请求去寻找coordinator。

    2. 步骤2:找到coordinator之后,发送JoinGroup请求
    consumer在这里会被划分leader、follower(无责任的说:选择第一个consumer)

    • leader作用:perform the leader synchronization and send back the assignment for the group(负责发送partition分配的结果)
    • follower作用:send follower's sync group with an empty assignment

    3. 步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition
    SyncGroupRequest

    • consumer leader发送 SyncGroupRequest给Coordinator,Coordinator回给它null
    • follower发送 null的 SyncGroupRequest 给Coordinator,Coordinator回给它partition分配的结果。

    注意,在上面3步中,有一个关键点:

    • partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。什么意思呢?在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。
    • 然后由这个leader进行partition分配。
    • 然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。

    接下来就到Fetcher拉取数据了

    2.2 Fetcher

    四个步骤
    0. 步骤0:获取consumer的offset

    1. 步骤1:生成FetchRequest,并放入发送队列
    2. 步骤2:网络poll
    3. 步骤3:获取结果

    1.获取consumer的offset

    当consumer初次启动的时候,面临的一个首要问题就是:从offset为多少的位置开始消费。

    poll之前,给集群发送请求,让集群告知客户端,当前该TopicPartition的offset是多少。通过SubscriptionState来实现, 通过ConsumerCoordinator

    if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions());
    

    核心是:向Coordinator发了一个OffsetFetchRequest,并且是同步调用,直到获取到初始的offset,再开始接下来的poll.(也就是说Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)

    consumer的每个TopicPartition都有了初始的offset,接下来就可以进行不断循环取消息了,这也就是Fetch的过程:

    2.生成FetchRequest,并放入发送队列 -- fetcher.initFetches(cluster)

    核心就是生成FetchRequest: 假设一个consumer订阅了3个topic: t0, t1, t2,为其分配的partition分别是: t0: p0; t1: p1, p2; t2: p2

    即总共4个TopicPartition,即t0p0, t0p1, t1p1, t2p2。这4个TopicPartition可能分布在2台机器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2

    则会分别针对每台机器生成一个FetchRequest,即Map<Node, FetchRequest>。所以会有一个方法把所有属于同一个Node的TopicPartition放在一起,生成一个FetchRequest。

    3.网络poll

    调用ConsumerNetworkClient.poll发送网络请求。向服务器发 送响应请求和获取服务器的响应。(默认值:executeDelayedTasks=true)

    4.获取结果 -- fetcher.fetchedRecords()

    获取Broker返回的Response,里面包含了List<ConsumerRecord> records

    2.3 offset确认机制

    • 是否自动消费确认:由参数auto.xxx.commit=true控制
    • 手动消费:用于自定义Consumers的消费控制

    下面从自动消费确认来分析,Offset自动确认是由ConsumerCoordinatorAutoCommitTask来实现的。

    其调用在ConsumerNetworkClientDelayedTaskQueue delayedTasks里面,然后被周期性的调用。 周期性的发送确认消息,类似HeartBeat,其实现机制也就是前面所讲的DelayedQueue + DelayedTask.

    确认一次:offset的提交

    poll函数中的注释:
    // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records

    • 可以这样理解:第二次poll调用的时候,提交上一次poll的offset和心跳发送
    • 先提交offset,再去拉取record。那么这次Offset其实是上一次poll的Record的offset。
    • 因此,当你把按照下面的逻辑写程序的时候,可能会导致Consumer与Coordinator的心跳超时。
    while(true) {
        consumer.poll();
        do process message // 假如这个耗时过长,那么这个consumer就无法发送心跳给coordinator,导致它错误认为这个consumer失去联系了,引起不必要的rebalance。槽糕的情况下,会丢重复消费数据。
    }
    

    因此,有必要把offset的提交单独拿出来做一个线程。

    到这里,就把整个Consumer的流程走完了。

    2.4 rebalance机制-- 作为一种补充机制,谈谈原理就好

  • 相关阅读:
    Brain network involved in autonomic functions 与自主功能相关的大脑网络
    Brief summary of classical components of ERP 事件相关成分(ERP)经典成分小结
    ICA & Percentage Variance Account For (PVAF)
    数据处理中白化Whitening的作用图解分析
    Loadings vs eigenvectors in PCA 主成分分析(PCA)中的负荷和特征向量
    主成分分析(PCA)和独立成分分析(ICA)相关资料
    Sketch of heart and QRS complex 心脏及QRS波群简图
    Brain Network visulation in EEG 脑电网络可视化
    Phase Locking Value (PLV) 神经信号的锁相值
    ubuntu16.04下的一些基本操作笔记
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6372600.html
Copyright © 2011-2022 走看看