zoukankan      html  css  js  c++  java
  • 关于Kafka java consumer管理TCP连接的讨论

      本篇是《关于Kafka producer管理TCP连接的讨论》的续篇,主要讨论Kafka java consumer是如何管理TCP连接。实际上,这两篇大部分的内容是相同的,即consumer也是把TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护。我们依然以“何时创建/创建多少/何时关闭/潜在问题/总结”的顺序来讨论。和上一篇一样,本文将无差别地混用名词TCP和Socket。

    一、何时创建TCP连接

      首先明确的是,在构建KafkaConsumer实例时是不会创建任何TCP连接的;另外在调用诸如subscribe或assign的时候也不会创建任何TCP连接。那么TCP连接是在什么时候创建的呢?严格来说有几个可能的时间点。从粗粒度层面来说,我们可以安全地认为Socket连接是在调用consumer.poll()创建的;从细粒度层面来说,TCP连接创建的时机有3个:1. 请求METADATA时;2. 进行组协调时;3. 发送数据时。

    二、创建多少个TCP连接

      对于每台broker而言,kafka consumer实例通常会创建3个TCP连接,第一个TCP连接是consumer请求集群元数据时创建的,之后consumer会使用这个Socket继续请求元数据以及寻找group对应的coordinator,如下列日志所示:

    [2019-01-01 17:38:22,301] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
    ...
    [2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending METADATA {topics=[bar,foo],allow_auto_topic_creation=true} with correlation id 2 to node -1 (org.apache.kafka.clients.NetworkClient:492)
    ...
    [2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {coordinator_key=test,coordinator_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:492)

     至于这里为什么是node -1是因为首次请求元数据时尚不确定broker.id,所以只能先用-1替代。

      第二个TCP连接供consumer执行组协调操作使用,这里的组协调操作包括:JOIN_GROUP(加入组)、SYNC_GROUP(等待组分配方案)、HEARTBEAT(心跳请求)、OFFSET_FETCH(获取位移)、OFFSET_COMMIT(提交位移)以及其他请求(比如LEAVE_GROUP,但本例中没有演示组成员退出的情形,故日志中没有出现这个请求类型),如下列日志所示:

    [2019-01-01 17:38:22,379] TRACE [Consumer clientId=consumer-1, groupId=test] Sending JOIN_GROUP {group_id=test,session_timeout=10000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} with correlation id 3 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
    [2019-01-01 17:38:22,382] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for JOIN_GROUP with correlation id 3, received {throttle_time_ms=0,error_code=0,generation_id=9,group_protocol=range,leader_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,members=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} (org.apache.kafka.clients.NetworkClient:810)
    [2019-01-01 17:38:22,386] TRACE [Consumer clientId=consumer-1, groupId=test] Sending SYNC_GROUP {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,group_assignment=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]} with correlation id 5 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
    [2019-01-01 17:38:22,388] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for SYNC_GROUP with correlation id 5, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} (org.apache.kafka.clients.NetworkClient:810)
    [2019-01-01 17:38:22,396] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_FETCH {group_id=test,topics=[{topic=bar,partitions=[{partition=0}]},{topic=foo,partitions=[{partition=0}]}]} with correlation id 6 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
    [2019-01-03 17:38:22,397] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_FETCH with correlation id 6, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]},{topic=foo,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]}],error_code=0} (org.apache.kafka.clients.NetworkClient:810)
    ...
    [2019-01-01 17:38:23,401] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_COMMIT {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,topics=[{topic=bar,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]},{topic=foo,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]}]} with correlation id 10 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
    [2019-01-01 17:38:23,403] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 10, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,error_code=0}]},{topic=foo,partition_responses=[{partition=0,error_code=0}]}]} (org.apache.kafka.clients.NetworkClient:810)

      上面标红的节点ID看上去有些奇怪,实际上它是由Integer.MAX_VALUE - coordinator的broker.id计算得来的,因为我的测试环境中只有一台broker且该id是0,所以这个Socket连接的节点ID就是Integer.MAX_VALUE,即2147483647。针对这个node ID的计算方式,Kafka代码是故意为之的,目的就是要让组协调请求和真正的数据获取请求使用不同的Socket连接。
      第三个Socket连接就非常好理解了,就是用于发送FETCH请求的。当consumer代码使用第一个Socket连接获取到集群元数据之后,每个broker的真实ID已经缓存在consumer本地的内存中,因此此时代码会使用真实的ID创建第三个Socket连接并用于消息获取,如下列日志所示:

    [2019-01-01 17:38:23,424] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=2,topics=[],forgotten_topics_data=[]} with correlation id 11 to node 0 (org.apache.kafka.clients.NetworkClient:492)
    [2019-01-01 17:38:23,927] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 0 for FETCH with correlation id 11, received {throttle_time_ms=0,error_code=0,session_id=104064890,responses=[]} (org.apache.kafka.clients.NetworkClient:810)
    [2019-01-01 17:38:23,928] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=3,topics=[],forgotten_topics_data=[]} with correlation id 12 to node 0 (org.apache.kafka.clients.NetworkClient:492)
    ...

    上面标红的节点0是真实的broker.id,可见consumer是使用这个Socket进行消息获取操作的。值得一提的是,当这个Socket连接成功建立之后,第一个Socket连接就会被废弃掉,之后所有的元数据请求都通过第三个Socket发送。

    三、何时关闭TCP连接

      和Producer原理相同,consumer关闭Socket也分为主动关闭和Kafka自动关闭。主动关闭依然是由用户发起,显式调用consumer.close()以及类似方法亦或是kill -9;而Kafka自动关闭同样由connections.max.idle.ms参数值控制。和producer有些不同的是,如果用户写consumer程序时使用了循环的方式来poll消息,那么上面提到的所有请求都会不断地发送到broker,故这些Socket连接上总是能保证有请求在发送,因此实现了“长连接”的效果。

    四、可能的问题?

      Consumer端和producer端的问题是一样的,即第一个Socket连接仅仅是为了首次(最多也就是几次)获取元数据之用,后面就会被废弃掉。根本的原因在于它使用了“假”的broker id去注册,当 后面consumer获取了真实的broker id之后它无法区分哪个broker id对应这个假ID,所以只能重新创建另外的Socket连接。

    五、总结

      最后总结一下当前的结论,针对最新版本Kafka(2.1.0)而言,Java consumer端管理TCP连接的方式是:

    1.  KafkaConsumer实例创建时不会创建任何Socket连接,实例创建之后首次请求元数据时会创建第一个Socket连接

    2. KafkaConsumer实例拿到元数据信息之后随机寻找其中一个broker去发现对应的coordinator,然后向coordinator所在broker创建第二个Socket连接。之后所有的组协调请求处理都经由该Socket

    3. 步骤1中创建的TCP连接只用于首次获取元数据信息,后面会被废弃掉

    4. 如果设置consumer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将成为“僵尸”连接

    5. 当前consumer判断是否存在与某broker的TCP连接依靠的是broker id,这是有问题的,依靠<host, port>对可能是更好的方式

  • 相关阅读:
    leetcode 13. Roman to Integer
    python 判断是否为有效域名
    leetcode 169. Majority Element
    leetcode 733. Flood Fill
    最大信息系数——检测变量之间非线性相关性
    leetcode 453. Minimum Moves to Equal Array Elements
    leetcode 492. Construct the Rectangle
    leetcode 598. Range Addition II
    leetcode 349. Intersection of Two Arrays
    leetcode 171. Excel Sheet Column Number
  • 原文地址:https://www.cnblogs.com/huxi2b/p/10215956.html
Copyright © 2011-2022 走看看