zoukankan      html  css  js  c++  java
  • 消费者心跳线程

    kafka broker 对消费者进行组管理,需要知道消费者是否活着,因此客户端有心跳线程发送心跳。

    3 个相关的参数:

    session.timeout.ms
    heartbeat.interval.ms
    max.poll.interval.ms

    AbstractCoordinator.HeartbeatThread

    @Override
    public void run() {
        try {
            log.debug("Heartbeat thread started");
            while (true) {
                // 获取锁
                synchronized (AbstractCoordinator.this) {
                    if (closed)
                        return;
    
                    if (!enabled) {
                        // 若 enabled 为 false,释放锁,一直等待
                        AbstractCoordinator.this.wait();
                        continue;
                    }
    
                    if (state != MemberState.STABLE) {
                        // 置 enabled 为 false
                        disable();
                        continue;
                    }
    
                    // 触发网络 io
                    client.pollNoWakeup();
                    long now = time.milliseconds();
    
                    if (coordinatorUnknown()) {
                        if (findCoordinatorFuture != null || lookupCoordinator().failed())
                            AbstractCoordinator.this.wait(retryBackoffMs);
                    } else if (heartbeat.sessionTimeoutExpired(now)) {
                        // now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout
                        markCoordinatorUnknown();
                    } else if (heartbeat.pollTimeoutExpired(now)) {
                        // now - lastPoll > maxPollInterval
                        // 主动离开组
                        maybeLeaveGroup();
                    } else if (!heartbeat.shouldHeartbeat(now)) {
                        AbstractCoordinator.this.wait(retryBackoffMs);
                    } else {
                        // 记录发送心跳的时间
                        heartbeat.sentHeartbeat(now);
    
                        sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
                            @Override
                            public void onSuccess(Void value) {
                                synchronized (AbstractCoordinator.this) {
                                    heartbeat.receiveHeartbeat(time.milliseconds());
                                }
                            }
    
                            @Override
                            public void onFailure(RuntimeException e) {
                                synchronized (AbstractCoordinator.this) {
                                    if (e instanceof RebalanceInProgressException) {
                                        // it is valid to continue heartbeating while the group is rebalancing. This
                                        // ensures that the coordinator keeps the member in the group for as long
                                        // as the duration of the rebalance timeout. If we stop sending heartbeats,
                                        // however, then the session timeout may expire before we can rejoin.
                                        heartbeat.receiveHeartbeat(time.milliseconds());
                                    } else {
                                        heartbeat.failHeartbeat();
    
                                        // wake up the thread if it's sleeping to reschedule the heartbeat
                                        AbstractCoordinator.this.notify();
                                    }
                                }
                            }
                        });
                    }
                }
            }
        } catch (AuthenticationException e) {
            log.error("An authentication error occurred in the heartbeat thread", e);
            this.failed.set(e);
        } catch (GroupAuthorizationException e) {
            log.error("A group authorization error occurred in the heartbeat thread", e);
            this.failed.set(e);
        } catch (InterruptedException | InterruptException e) {
            Thread.interrupted();
            log.error("Unexpected interrupt received in heartbeat thread", e);
            this.failed.set(new RuntimeException(e));
        } catch (Throwable e) {
            log.error("Heartbeat thread failed due to unexpected error", e);
            if (e instanceof RuntimeException)
                this.failed.set((RuntimeException) e);
            else
                this.failed.set(new RuntimeException(e));
        } finally {
            log.debug("Heartbeat thread has closed");
        }
    }

    心跳回调

    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
        HeartbeatRequest.Builder requestBuilder =
                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
        return client.send(coordinator, requestBuilder)
                .compose(new HeartbeatResponseHandler());
    }
    
    private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                        coordinator());
                markCoordinatorUnknown();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heartbeat failed since group is rebalancing");
                requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    如果返回正常,则记录接收心跳的时间,返回 Errors.REBALANCE_IN_PROGRESS 或 Errors.ILLEGAL_GENERATION 则需要重新加入组。

    消费者心跳和 fetch 类请求,用不同的连接

    // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.FindCoordinatorResponseHandler#onSuccess
    @Override
    public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Received FindCoordinator response {}", resp);
        clearFindCoordinatorFuture();
    
        FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
        Errors error = findCoordinatorResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                // 使用一个不同的 nodeId
                // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                // for the coordinator in the underlying network client layer
                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
    
                AbstractCoordinator.this.coordinator = new Node(
                        coordinatorConnectionId,
                        findCoordinatorResponse.node().host(),
                        findCoordinatorResponse.node().port());
                log.info("Discovered group coordinator {}", coordinator);
                // 单独用一个连接发送心跳
                client.tryConnect(coordinator);
                heartbeat.resetTimeouts(time.milliseconds());
            }
            future.complete(null);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(new GroupAuthorizationException(groupId));
        } else {
            log.debug("Group coordinator lookup failed: {}", error.message());
            future.raise(error);
        }
    }
    
    // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendHeartbeatRequest
    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
        HeartbeatRequest.Builder requestBuilder =
                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
        // 正常的数据类请求,client.send 的第一个参数是 broker 的 id,即 server.properties 中的 broker.id
        // 而心跳类请求,和数据类型请求分开了,用另外的连接
        return client.send(coordinator, requestBuilder)
                .compose(new HeartbeatResponseHandler());
    }
  • 相关阅读:
    微信公众号接口配置
    OFBIZ:启动之ContainerLoader
    OFBIZ:启动之StartupLoader
    Capture a Screen Shot
    在 Windows 上安装Rabbit MQ 指南
    Quartz.NET管理周期性任务
    使用Topshelf创建Windows服务
    Redirecting Console.WriteLine() to Textbox
    Greenplum 备忘
    CockroachDB 备忘
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11623624.html
Copyright © 2011-2022 走看看