zoukankan      html  css  js  c++  java
  • kafka 心跳和 rebalance

    kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳。

    consumer 和 rebalance 相关的 2 个配置参数:

    参数名                --> MemberMetadata 字段
    session.timeout.ms   --> MemberMetadata.sessionTimeoutMs
    max.poll.interval.ms --> MemberMetadata.rebalanceTimeoutMs

    broker 端,sessionTimeoutMs 参数

    broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。

     1   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
     2     // complete current heartbeat expectation
     3     member.latestHeartbeat = time.milliseconds()
     4     val memberKey = MemberKey(member.groupId, member.memberId)
     5     heartbeatPurgatory.checkAndComplete(memberKey)
     6 
     7     // reschedule the next heartbeat expiration deadline
     8     // 计算心跳截止时刻
     9     val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    10     val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
    11     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
    12   } 
    13   
    14   // 心跳过期
    15   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    16     group.inLock {
    17       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
    18         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
    19         removeMemberAndUpdateGroup(group, member)
    20       }
    21     }
    22   }
    23   
    24   private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
    25     member.awaitingJoinCallback != null ||
    26       member.awaitingSyncCallback != null ||
    27       member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

    consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数 

    如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance

    org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 代码片段:

    if (coordinatorUnknown()) {
        if (findCoordinatorFuture != null || lookupCoordinator().failed())
            // the immediate future check ensures that we backoff properly in the case that no
            // brokers are available to connect to.
            AbstractCoordinator.this.wait(retryBackoffMs);
    } else if (heartbeat.sessionTimeoutExpired(now)) {
        // the session timeout has expired without seeing a successful heartbeat, so we should
        // probably make sure the coordinator is still healthy.
        markCoordinatorUnknown();
    } else if (heartbeat.pollTimeoutExpired(now)) {
        // the poll timeout has expired, which means that the foreground thread has stalled
        // in between calls to poll(), so we explicitly leave the group.
        maybeLeaveGroup();
    } else if (!heartbeat.shouldHeartbeat(now)) {
        // poll again after waiting for the retry backoff in case the heartbeat failed or the
        // coordinator disconnected
        AbstractCoordinator.this.wait(retryBackoffMs);
    } else {
        heartbeat.sentHeartbeat(now);
    
        sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                synchronized (AbstractCoordinator.this) {
                    heartbeat.receiveHeartbeat(time.milliseconds());
                }
            }
    
            @Override
            public void onFailure(RuntimeException e) {
                synchronized (AbstractCoordinator.this) {
                    if (e instanceof RebalanceInProgressException) {
                        // it is valid to continue heartbeating while the group is rebalancing. This
                        // ensures that the coordinator keeps the member in the group for as long
                        // as the duration of the rebalance timeout. If we stop sending heartbeats,
                        // however, then the session timeout may expire before we can rejoin.
                        heartbeat.receiveHeartbeat(time.milliseconds());
                    } else {
                        heartbeat.failHeartbeat();
    
                        // wake up the thread if it's sleeping to reschedule the heartbeat
                        AbstractCoordinator.this.notify();
                    }
                }
            }
        });
    }
     1 /**
     2  * A helper class for managing the heartbeat to the coordinator
     3  */
     4 public final class Heartbeat {
     5     private final long sessionTimeout;
     6     private final long heartbeatInterval;
     7     private final long maxPollInterval;
     8     private final long retryBackoffMs;
     9 
    10     private volatile long lastHeartbeatSend; // volatile since it is read by metrics
    11     private long lastHeartbeatReceive;
    12     private long lastSessionReset;
    13     private long lastPoll;
    14     private boolean heartbeatFailed;
    15 
    16     public Heartbeat(long sessionTimeout,
    17                      long heartbeatInterval,
    18                      long maxPollInterval,
    19                      long retryBackoffMs) {
    20         if (heartbeatInterval >= sessionTimeout)
    21             throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
    22 
    23         this.sessionTimeout = sessionTimeout;
    24         this.heartbeatInterval = heartbeatInterval;
    25         this.maxPollInterval = maxPollInterval;
    26         this.retryBackoffMs = retryBackoffMs;
    27     }
    28 
    29     public void poll(long now) {
    30         this.lastPoll = now;
    31     }
    32 
    33     public void sentHeartbeat(long now) {
    34         this.lastHeartbeatSend = now;
    35         this.heartbeatFailed = false;
    36     }
    37 
    38     public void failHeartbeat() {
    39         this.heartbeatFailed = true;
    40     }
    41 
    42     public void receiveHeartbeat(long now) {
    43         this.lastHeartbeatReceive = now;
    44     }
    45 
    46     public boolean shouldHeartbeat(long now) {
    47         return timeToNextHeartbeat(now) == 0;
    48     }
    49     
    50     public long lastHeartbeatSend() {
    51         return this.lastHeartbeatSend;
    52     }
    53 
    54     public long timeToNextHeartbeat(long now) {
    55         long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
    56         final long delayToNextHeartbeat;
    57         if (heartbeatFailed)
    58             delayToNextHeartbeat = retryBackoffMs;
    59         else
    60             delayToNextHeartbeat = heartbeatInterval;
    61 
    62         if (timeSinceLastHeartbeat > delayToNextHeartbeat)
    63             return 0;
    64         else
    65             return delayToNextHeartbeat - timeSinceLastHeartbeat;
    66     }
    67 
    68     public boolean sessionTimeoutExpired(long now) {
    69         return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
    70     }
    71 
    72     public long interval() {
    73         return heartbeatInterval;
    74     }
    75 
    76     public void resetTimeouts(long now) {
    77         this.lastSessionReset = now;
    78         this.lastPoll = now;
    79         this.heartbeatFailed = false;
    80     }
    81 
    82     public boolean pollTimeoutExpired(long now) {
    83         return now - lastPoll > maxPollInterval;
    84     }
    85 
    86 }
    View Code

    join group 的处理逻辑:kafka.coordinator.group.GroupCoordinator#onCompleteJoin

  • 相关阅读:
    Linux 配置 SSL 证书
    freemarker 异常处理
    StringTemplateLoader的用法
    序列的重点知识小结
    Linux下安装lrzsz上传下载工具
    ajax技术
    Response对象介绍(服务器到客户端)
    Request对象介绍(客户端到服务器)
    JSP--内置对象&动作标签介绍
    JSP--常用指令
  • 原文地址:https://www.cnblogs.com/allenwas3/p/10278998.html
Copyright © 2011-2022 走看看