zoukankan      html  css  js  c++  java
  • kafka 消费者拉取消息

    本文只跟踪消费者拉取消息的流程。对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient。

    入口在 KafkaConsumer#pollOnce 中,抽出主要步骤:

    // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送
    fetcher.sendFetches();
    
    // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll
    client.poll(pollTimeout, nowMs, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        }
    });
    
    // 返回数据给用户
    return fetcher.fetchedRecords();

    Fetcher#sendFetches

    public synchronized int sendFetches() {
        // 构造拉取消息请求。从哪个节点,哪个分区,什么位置拉取消息
        Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
            final Node fetchTarget = entry.getKey();
            final FetchSessionHandler.FetchRequestData data = entry.getValue();
            //1. 借助 Builder 构造 FetchRequest 对象
            final FetchRequest.Builder request = FetchRequest.Builder
                    .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                    .isolationLevel(isolationLevel)
                    .setMaxBytes(this.maxBytes)
                    .metadata(data.metadata())
                    .toForget(data.toForget());
            if (log.isDebugEnabled()) {
                log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
            }
    
            
            client.send(fetchTarget, request)
                    //4. 给 RequestFutureCompletionHandler.future 添加 RequestFutureListener
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            synchronized (Fetcher.this) {
                                FetchResponse response = (FetchResponse) resp.responseBody();
                                FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                                if (handler == null) {
                                    log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                        fetchTarget.id());
                                    return;
                                }
                                if (!handler.handleResponse(response)) {
                                    return;
                                }
    
                                Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                                FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
    
                                for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                    TopicPartition partition = entry.getKey();
                                    long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                                    FetchResponse.PartitionData fetchData = entry.getValue();
    
                                    log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                            isolationLevel, fetchOffset, partition, fetchData);
                                    // 10. 把数据放入 completedFetches,最终返回给用户
                                    completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                            resp.requestHeader().apiVersion()));
                                }
    
                                sensors.fetchLatency.record(resp.requestLatencyMs());
                            }
                        }
    
                        @Override
                        public void onFailure(RuntimeException e) {
                            synchronized (Fetcher.this) {
                                FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                                if (handler != null) {
                                    handler.handleError(e);
                                }
                            }
                        }
                    });
        }
        return fetchRequestMap.size();
    }

    ConsumerNetworkClient#send

    public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
        long now = time.milliseconds();
        //2. 使用 RequestFutureCompletionHandler 作为回调函数
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
        ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
                completionHandler);
        //3. 请求放入 unsent 集合        
        unsent.put(node, clientRequest);
    
        // wakeup the client in case it is blocking in poll so that we can send the queued request
        client.wakeup();
        return completionHandler.future;
    }

    ConsumerNetworkClient#poll

    // 5. 发送 unsent 中的请求,并没有产生网络 io
    trySend(now);
    
    // 真实的网络数据写和读
    // 6. 发送请求
    // 7. 接收响应
    // 8. 触发 RequestFutureCompletionHandler 回调
    client.poll(0, now);
    
    // 9. 触发 RequestFutureListener 中的回调
    firePendingCompletedRequests();

    NetworkClient#handleCompletedReceives

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = inFlightRequests.completeNext(source);
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
            if (log.isTraceEnabled()) {
                log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                    req.header.apiKey(), req.header.correlationId(), responseStruct);
            }
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            if (req.isInternalRequest && body instanceof MetadataResponse)
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
                // 此处给 responses 添加元素
                // return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, null, response);
                // 直接把请求的 callback 赋值给响应 
                // 生产者发送消息的 callback,是用户通过参数传入的
                // 消费者拉取消息的 callback,是在 ConsumerNetworkClient#send 指定的,是 RequestFutureCompletionHandler
                responses.add(req.completed(body, now));
        }
    }

    NetworkClient#completeResponses

    private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                // callback.onComplete(this);
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

    RequestFutureCompletionHandler#onComplete

    public void onComplete(ClientResponse response) {
        this.response = response;
        pendingCompletion.add(this);
    }

    ConsumerNetworkClient#firePendingCompletedRequests

    private void firePendingCompletedRequests() {
        boolean completedRequestsFired = false;
        for (;;) {
            RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
            if (completionHandler == null)
                break;
    
            completionHandler.fireCompletion();
            completedRequestsFired = true;
        }
    
        // wakeup the client in case it is blocking in poll for this future's completion
        if (completedRequestsFired)
            client.wakeup();
    }

    ConsumerNetworkClient.RequestFutureCompletionHandler#fireCompletion

    public void fireCompletion() {
        if (e != null) {
            future.raise(e);
        } else if (response.wasDisconnected()) {
            RequestHeader requestHeader = response.requestHeader();
            int correlation = requestHeader.correlationId();
            log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                    requestHeader.apiKey(), requestHeader, correlation, response.destination());
            future.raise(DisconnectException.INSTANCE);
        } else if (response.versionMismatch() != null) {
            future.raise(response.versionMismatch());
        } else {
            future.complete(response);
        }
    }

    RequestFuture#complete

    public void complete(T value) {
        try {
            if (value instanceof RuntimeException)
                throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
    
            if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
            fireSuccess();
        } finally {
            completedLatch.countDown();
        }
    }
    
    private void fireSuccess() {
        T value = value();
        while (true) {
            RequestFutureListener<T> listener = listeners.poll();
            if (listener == null)
                break;
            // 终于调到 RequestFutureListener
            listener.onSuccess(value);
        }
    }

    如果不考虑心跳线程,consumer 第一次 poll 是不会有数据的,此时请求才发出去,响应还没回来,必须在第二次 poll 时,才能同时处理网络读写事件。

    跟完之后,个人觉得调用链还是挺长的。一点感觉,全程只有一个线程,但是每次走的分支都不一样,给人的启发就是,单线程只要不等待,速度也很快。

  • 相关阅读:
    【python笔记】类
    【Marva Collins' Way】第八章
    【Marva Collins' Way】第七章
    【python笔记】包
    【python笔记】模块
    【Marva Collins' Way】第六章
    【Marva Collins' Way】第九章
    【python笔记】异常
    Axios跨域&封装接口
    js更新数据
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11617514.html
Copyright © 2011-2022 走看看