zoukankan      html  css  js  c++  java
  • kafka-clients 1.0 高阶API消费消息(未完)

    消费消息的请求(按序)

    • org/apache/kafka/common/requests/RequestHeader
    • org/apache/kafka/common/requests/ApiVersionsRequest
    • org/apache/kafka/common/requests/MetadataRequest 批量查询topic的元数据信息
    • org/apache/kafka/common/requests/FindCoordinatorRequest 从拿到的topic的元数据中取出leader节点 作为组协调者
    • org/apache/kafka/common/requests/JoinGroupRequest
    • org/apache/kafka/common/requests/SyncGroupRequest
    • org/apache/kafka/common/requests/OffsetFetchRequest
    • org/apache/kafka/common/requests/ListOffsetRequest
    • org/apache/kafka/common/requests/FetchRequest
    • org/apache/kafka/common/requests/HeartbeatRequest

    请求接口文档参考
    响应接口文档参考

    RequestHeader

    请求头

    nametypedefaultValuedocString
    api_keyINT16null请求接口编号
    api_versionINT16nullapi版本
    correlation_idINT32null用户提供的一个整数id,用于响应时由响应体带回来
    client_idNULLABLE_STRINGnull用户提供的client id

    ApiVersionsRequest

    查询API版本信息

    请求 version:1

    仅仅有请求头

    响应 version:1

    nametypedefaultValuedocString
    error_codeINT16null错误码
    api_versionsARRAY({api_key:INT16,min_version:INT16,max_version:INT16})nullbroker能支持的api各版本列表。含最低版本,最高版本.
    throttle_time_msINT320由于配额冲突而阻止请求的持续时间(毫秒)(如果请求未违反任何配额,则为零

    虽是请求broker端,但是实际还是用的client中的API完成的逻辑:
    ApiVersionsResponse.apiVersionsResponse
    根据messageFormatVersion 消息格式版本推导出各API版本情况。
    API版本 最小的是0 。写的固定的。 最大的是 requestSchemas的length -1 即requestSchemas最大版本。
    此处不仅返回每个API的最小版本与最大版本,还返回能支持的API列表。如因版本问题不能支持的API是不会返回的。
    能否支持的判断依据是,API依赖的最小消息格式版本小于当前的消息格式版本,那么就支持。

    for (ApiKeys apiKey : ApiKeys.values()) {
        if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
            versionList.add(neApiVersionsResponse.ApiVersion(apiKey));
        }
    }
    

    MetadataRequest

    批量查询topic的元数据信息

    请求 version:5

    nametypedefaultValuedocString
    topicsARRAY(STRING)null需要查元数据的topic的列表,如果不送则查所有topic的元数据
    allow_auto_topic_creationBOOLEANnull在broker配置了允许自动创建topic时是否自动创建topic

    响应 version:4

    nametypedefaultValuedocString
    throttle_time_msINT320由于配额冲突而阻止请求的持续时间(毫秒)(如果请求未违反任何配额,则为零)
    brokersARRAY({node_id:INT32,host:STRING,port:INT32,rack:NULLABLE_STRING})null所有 活着的 broker的id ip port的信息
    cluster_idNULLABLE_STRINGnull集群id
    controller_idINT32nullcontroller角色的broker的id
    topic_metadataARRAY({error_code:INT16,topic:STRING,is_internal:BOOLEAN,partition_metadata:ARRAY({error_code:INT16,partition:INT32,leader:INT32,replicas:ARRAY(INT32),isr:ARRAY(INT32)})})nulltopic元数据,分区数、leader broker的id、副本所在broker id列表、isr broker id列表

    broker端处理
    在broker端

    1. 过滤出授权的topics KafkaApis.handleTopicMetadataRequest
    2. 查询出授权topics的元数据 KafkaApis.getTopicMetadata
      2.1 从缓存中拿,拿到(跟topics的size相同)即返回
      2.2 处理没拿到的topic
      2.2.1 允许创建topic的,就按默认副本数和默认分区数创建,不能创建的或者创建出错的返回出错信息。创建topic前提是协调者可用。否则COORDINATOR_NOT_AVAILABLE。
      2.2.2 返回创建后的metadata
      元数据信息有缓存 kafka.server.MetadataCache.cache:topic <--> [partitionNo <--> 分区状态封装]
      MetadataCache中一系列getxxx方法都是用来读取检索的。
      元数据缓存的更新参见 《MetadataCache更新》

    FindCoordinatorRequest

    查询协调者

    请求 version:1

    nametypedefaultValuedocString
    coordinator_keySTRINGnull组协调时是组id
    事务协调时是事务id
    coordinator_typeINT8null协调类型(0 = group, 1 = transaction)

    响应 version:1

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    error_codeINT16nullResponse error code
    error_messageNULLABLE_STRINGnullResponse error message
    coordinator{node_id:INT32,host:STRING,port:INT32}null协调者broker的id ip port

    组协调与事务协调都用这个请求
    coordinatorKey 组协调是组id 事务协调时是事务id
    分区对应的leader节点就是组协调者

     val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
        .find(_.partition == partition)
        .map(_.leader) // SimonNote: leader节点作为协调者
    
    

    这个请求的响应也就是将协调者信息(node_id,host,port)返回去

    JoinGroupRequest

    加入消费组的请求

    请求 version:2

    nametypedefaultValuedocString
    group_idSTRINGnull唯一的组标志
    session_timeoutINT32null会话时间,超过这个时间没收到心跳,协调者就认为这个消费者挂了
    rebalance_timeoutINT32null协调者在重新平衡组时等待每个成员重新加入的最长时间
    member_idSTRINGnull由组协调者分配的成员ID,如果是第一次加入,则为空。
    protocol_typeSTRINGnull组协调协议实现类的唯一名称
    group_protocolsARRAY({protocol_name:STRING,protocol_metadata:BYTES})null组成员能支持的组协调协议列表

    响应 version:2

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    error_codeINT16nullResponse error code
    generation_idINT32null组的年代?
    group_protocolSTRINGnull协调者选中的组协议
    leader_idSTRINGnull组中的leader
    member_idSTRINGnull第一次加入的时候组协调者给分的成员id
    membersARRAY({member_id:STRING,member_metadata:BYTES})null组内成员?

    kafka.coordinator.group.GroupCoordinator.handleJoinGroup

    一系列的check:

    1. 协调者是否可用
    2. 是否是本分区的协调者
    3. 消费组id是否合法(是否为空)
    4. 是否协调者正在load中,GroupMetadataManager会管理当前partition是否在load中
    5. sessionTimeoutMs是否在组配置的最大最小范围内

    groupManager加入新建的GroupMetadata实例(如果没有的话,有就直接下一步了),GroupMetadata有哪些东西,下面注释写了一部分,但是还包含事务消息用一些offset

    /**
     * Group contains the following metadata:
     *
     *  Membership metadata:
     *  1. Members registered in this group
     *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
     *  3. Protocol metadata associated with group members
     *
     *  State metadata:
     *  1. group state
     *  2. generation id
     *  3. leader id
     */
    

    doJoinGroup

    一系列的check后,根据group.currentState做相应处理
    group.currentState

    GroupMetadata.scala中有对group状态定义及action及走向到哪的明确详细描述,非常重要

    SyncGroupRequest

    请求 version:1

    nametypedefaultValuedocString
    group_idSTRINGnullgroup唯一标志
    generation_idINT32null代的标志?
    member_idSTRINGnull第一次加入的时候组协调者给分的成员id
    group_assignmentARRAY({member_id:STRING,member_assignment:BYTES})nullnull

    响应 version:1

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    error_codeINT16nullResponse error code
    member_assignmentBYTESnullnull

    OffsetFetchRequest

    请求 version:3

    nametypedefaultValuedocString
    group_idSTRINGnullgroup id
    topicsARRAY({topic:STRING,partitions:ARRAY({partition:INT32})})nulltopic列表,支持多个topic

    响应 version:3

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    responsesARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,offset:INT64,metadata:NULLABLE_STRING,error_code:INT16})})null列表:topic-[{分区号-offset,元数据信息}]
    error_codeINT16nullResponse error code

    ListOffsetRequest

    请求 version:2

    nametypedefaultValuedocString
    replica_idINT32nullfollower的broker的id. 正常消费用-1.
    isolation_levelINT8null事务消息可见性设置。 使用 READ_UNCOMMITTED (isolation_level = 0)能看到所有消息. 使用 READ_COMMITTED (isolation_level = 1), 非事务消息和已经提交的消息能被看到. 更具体一点, READ_COMMITTED 返回比当前 LSO (last stable offset)小的offset, 并允许返回已经取消的事务
    topicsARRAY({topic:STRING,partitions:ARRAY({partition:INT32,timestamp:INT64})})null列表:topic,partitions{分区号,时间戳}

    响应 version:2

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    responsesARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,error_code:INT16,timestamp:INT64,offset:INT64})})null列表:topic 分区号 错误码 时间戳 offset

    FetchRequest

    请求 version:6

    nametypedefaultValuedocString
    replica_idINT32nullfollower的broker的id. 正常消费用-1
    max_wait_timeINT32null等待响应的最大时间 单位ms.
    min_bytesINT32null最小字节
    max_bytesINT32null最大字节. 单条消息如果超过这个大小也将返回
    isolation_levelINT8null事务隔离级别
    topicsARRAY({topic:STRING,partitions:ARRAY({partition:INT32,fetch_offset:INT64,log_start_offset:INT64,max_bytes:INT32})})null列表: topic 分区号 取的offset log开始的 offset?? 最大字节.

    响应 version:6

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    responsesARRAY({topic:STRING,partition_responses:ARRAY({partition_header:{partition:INT32,error_code:INT16,high_watermark:INT64,last_stable_offset:INT64,log_start_offset:INT64,aborted_transactions:ARRAY({producer_id:INT64,first_offset:INT64})},record_set:RECORDS})})null列表: topic 分区头: 分区号 高水位值 LSO(上次稳定offset), log开始offset,取消事务:生产者id 第一个offset。 消息记录集

    HeartbeatRequest

    请求 version:1

    nametypedefaultValuedocString
    group_idSTRINGnullgroup id
    generation_idINT32nullgroup的年代
    member_idSTRINGnull第一次加入的时候组协调者给分的成员id

    响应 version:1

    nametypedefaultValuedocString
    throttle_time_msINT320Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
    error_codeINT16null响应码
  • 相关阅读:
    CF-1111 (2019/2/7 补)
    CF-1096C Polygon for the Angle
    CF-1100 E Andrew and Taxi
    CF-1099 D. Sum in the tree
    sscanf的使用
    CF-1082(渣渣只做了前三个)
    UVA-10817- Headmaster's Headache(状压DP)
    UVA-1220-Party at Hali-Bula && UVA-1218-Perfect Service(树形DP)
    CF-1072-C. Cram Time(贪心,数学)
    CF-1027-B. Curiosity Has No Limits
  • 原文地址:https://www.cnblogs.com/simoncook/p/11809460.html
Copyright © 2011-2022 走看看