zoukankan      html  css  js  c++  java
  • Kafka – kafka consumer

    ConsumerRecords<String, String> records = consumer.poll(100); 

     

    /**
         * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
         * subscribed to any topics or partitions before polling for data.
         * <p>
         * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
         * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
         * offset for the subscribed list of partitions
         *
         *
         * @param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
         *            If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
         *            Must not be negative.
         * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
         *
         * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
         *             partitions is undefined or out of range and no offset reset policy has been configured
         * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
         *             function is called
         * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
         *             this function is called
         * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
         *             topics or to the configured groupId
         * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
         *             session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
         * @throws java.lang.IllegalArgumentException if the timeout value is negative
         * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
         *             partitions to consume from
         */
        @Override
        public ConsumerRecords<K, V> poll(long timeout) {
            try {
                if (timeout < 0)
                    throw new IllegalArgumentException("Timeout must not be negative");
    
                if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                    throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
    
                // poll for new data until the timeout expires
                long start = time.milliseconds();
                long remaining = timeout;
                do {
                    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); //pollOnce
                    if (!records.isEmpty()) {
                        // before returning the fetched records, we can send off the next round of fetches
                        // and avoid block waiting for their responses to enable pipelining while the user
                        // is handling the fetched records.
                        //
                        // NOTE: since the consumed position has already been updated, we must not allow
                        // wakeups or any other errors to be triggered prior to returning the fetched records.
                        if (fetcher.sendFetches() > 0) { //为了省时间,预先放fetch一次
                            client.pollNoWakeup();
                        }
    
                        if (this.interceptors == null)
                            return new ConsumerRecords<>(records);
                        else
                            return this.interceptors.onConsume(new ConsumerRecords<>(records)); //如果有interceptors,先处理一下
                    }
    
                    long elapsed = time.milliseconds() - start;
                    remaining = timeout - elapsed; //在超时内,反复尝试poll
                } while (remaining > 0);
    
                return ConsumerRecords.empty(); //如果数据不ready,返回empty
            } finally {
                release();
            }
        }

     

    pollOnce

    /**
         * Do one round of polling. In addition to checking for new data, this does any needed offset commits
         * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
         * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
         * @return The fetched records (may be empty)
         */
        private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            coordinator.poll(time.milliseconds()); //和ConsuemrCoordinator之间的心跳
    
            // fetch positions if we have partitions we're subscribed to that we
            // don't know the offset for
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions()); //同步offset
    
            // if data is available already, return it immediately
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); //已经有fetched
            if (!records.isEmpty())
                return records; //直接返回
    
            // send any new fetches (won't resend pending fetches)
            fetcher.sendFetches(); //没有现成的数据,发送fetch命令
    
            long now = time.milliseconds();
            long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
    
            client.poll(pollTimeout, now, new PollCondition() {
                @Override
                public boolean shouldBlock() {
                    // since a fetch might be completed by the background thread, we need this poll condition
                    // to ensure that we do not block unnecessarily in poll()
                    return !fetcher.hasCompletedFetches();
                }
            });
    
            // after the long poll, we should check whether the group needs to rebalance
            // prior to returning data so that the group can stabilize faster
            if (coordinator.needRejoin())
                return Collections.emptyMap();
    
            return fetcher.fetchedRecords();
        }

     

    看下fetcher

    public Fetcher(ConsumerNetworkClient client,
                       int minBytes,
                       int maxBytes,
                       int maxWaitMs,
                       int fetchSize,
                       int maxPollRecords,
                       boolean checkCrcs,
                       Deserializer<K> keyDeserializer,
                       Deserializer<V> valueDeserializer,
                       Metadata metadata,
                       SubscriptionState subscriptions,
                       Metrics metrics,
                       String metricGrpPrefix,
                       Time time,
                       long retryBackoffMs) {

    创建时,

    this.fetcher = new Fetcher<>(this.client,
                        config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                        config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                        config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                        config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                        config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                        config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                        this.keyDeserializer,
                        this.valueDeserializer,
                        this.metadata,
                        this.subscriptions,
                        metrics,
                        metricGrpPrefix,
                        this.time,
                        this.retryBackoffMs);
    可以看出对应的配置

     

    fetcher.fetchedRecords

    /**
         * Return the fetched records, empty the record buffer and update the consumed position.
         *
         * NOTE: returning empty records guarantees the consumed position are NOT updated.
         *
         * @return The fetched records per partition
         * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
         *         the defaultResetPolicy is NONE
         */
        public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
            int recordsRemaining = maxPollRecords; //最大poll records数
    
            while (recordsRemaining > 0) {
                if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //如果nextInLineRecords是空的,没有records的
                    CompletedFetch completedFetch = completedFetches.poll(); //从completedFetches,fetched队列中取一个fetch
                    if (completedFetch == null)
                        break;
    
                    nextInLineRecords = parseFetchedData(completedFetch); //parse Fetch到nextInLineRecords中
                } else {
                    TopicPartition partition = nextInLineRecords.partition;
    
                    List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); //从nextInLineRecords取recordsRemaining个records
                    if (!records.isEmpty()) {
                        List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); //取出partition对应的record list
                        if (currentRecords == null) {
                            drained.put(partition, records); //放入record list
                        } else {
                            // this case shouldn't usually happen because we only send one fetch at a time per partition,
                            // but it might conceivably happen in some rare cases (such as partition leader changes).
                            // we have to copy to a new list because the old one may be immutable
                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                            newRecords.addAll(currentRecords);
                            newRecords.addAll(records);
                            drained.put(partition, newRecords);
                        }
                        recordsRemaining -= records.size();
                    }
                }
            }
    
            return drained; //返回
        }

    可以看到fetchedRecords只是从已经完成的fetch中读取数据

     

    fetcher.sendFetches

    先看

    createFetchRequests
    /**
         * Create fetch requests for all nodes for which we have assigned partitions
         * that have no existing requests in flight.
         */
        private Map<Node, FetchRequest> createFetchRequests() {
            // create the fetch info
            Cluster cluster = metadata.fetch();
            Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
            for (TopicPartition partition : fetchablePartitions()) {
                Node node = cluster.leaderFor(partition); //找到partition的leader所在node
                if (node == null) {
                    metadata.requestUpdate();
                } else if (this.client.pendingRequestCount(node) == 0) { //如果没有正在进行的fetch,一个partition同时只能有一个fetch请求
                    // if there is a leader and no in-flight requests, issue a new fetch
                    LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                    if (fetch == null) {
                        fetch = new LinkedHashMap<>();
                        fetchable.put(node, fetch);
                    }
    
                    long position = this.subscriptions.position(partition);
                    fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); //创建FetchRequest,position,从哪儿开始读,fetchSize,读多少
                    log.trace("Added fetch request for partition {} at offset {}", partition, position);
                } else {
                    log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
                }
            }
    
            // create the fetches
            Map<Node, FetchRequest> requests = new HashMap<>();
            for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
                Node node = entry.getKey();
                FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); //封装成FetchRequest
                requests.put(node, fetch);
            }
            return requests;
        }

     

       /**
         * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
         * an in-flight fetch or pending fetch data.
         * @return number of fetches sent
         */
        public int sendFetches() {
            Map<Node, FetchRequest> fetchRequestMap = createFetchRequests(); //创建Fetch Request
            for (Map.Entry<Node, FetchRequest> fetchEntry : fetchRequestMap.entrySet()) {
                final FetchRequest request = fetchEntry.getValue();
                final Node fetchTarget = fetchEntry.getKey();
    
                client.send(fetchTarget, ApiKeys.FETCH, request) //send request
                        .addListener(new RequestFutureListener<ClientResponse>() {
                            @Override
                            public void onSuccess(ClientResponse resp) { //如果成功
                                FetchResponse response = (FetchResponse) resp.responseBody();
                                Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
    
                                for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                    TopicPartition partition = entry.getKey();
                                    long fetchOffset = request.fetchData().get(partition).offset;
                                    FetchResponse.PartitionData fetchData = entry.getValue();
                                    completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); //把fetchData封装成CompletedFetch,加入completedFetcheslist
                                }
    
                                sensors.fetchLatency.record(resp.requestLatencyMs());
                                sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                            }
    
                            @Override
                            public void onFailure(RuntimeException e) {
                                log.debug("Fetch request to {} failed", fetchTarget, e);
                            }
                        });
            }
            return fetchRequestMap.size();
        }

     

    client.send

    ConsumerNetworkClient
    /**
         * Send a new request. Note that the request is not actually transmitted on the
         * network until one of the {@link #poll(long)} variants is invoked. At this
         * point the request will either be transmitted successfully or will fail.
         * Use the returned future to obtain the result of the send. Note that there is no
         * need to check for disconnects explicitly on the {@link ClientResponse} object;
         * instead, the future will be failed with a {@link DisconnectException}.
         * @param node The destination of the request
         * @param api The Kafka API call
         * @param request The request payload
         * @return A future which indicates the result of the send.
         */
        public RequestFuture<ClientResponse> send(Node node,
                                                  ApiKeys api,
                                                  AbstractRequest request) {
            return send(node, api, ProtoUtils.latestVersion(api.id), request);
        }
    
        private RequestFuture<ClientResponse> send(Node node,
                                                   ApiKeys api,
                                                   short version,
                                                   AbstractRequest request) {
            long now = time.milliseconds();
            RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
            RequestHeader header = client.nextRequestHeader(api, version);
            ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler); //封装成client request
            put(node, clientRequest); //没有真正发出,而是放入list
    
            // wakeup the client in case it is blocking in poll so that we can send the queued request
            client.wakeup();
            return completionHandler.future;
        }
    
        private void put(Node node, ClientRequest request) {
            synchronized (this) {
                List<ClientRequest> nodeUnsent = unsent.get(node);
                if (nodeUnsent == null) {
                    nodeUnsent = new ArrayList<>();
                    unsent.put(node, nodeUnsent);
                }
                nodeUnsent.add(request);
            }
        }

     

    NetworkClient.wakeup

    /**
         * Interrupt the client if it is blocked waiting on I/O.
         */
        @Override
        public void wakeup() {
            this.selector.wakeup();
        }

    wakeup就是让client从selector的block等待中,被唤醒,可以处理其他的请求

     

    这里说了,只有当poll被调用的时候,才会真正的将request发送出去,poll是在哪儿被调用的?

     

    在上面pollOnce的时候,有这样的逻辑

            client.poll(pollTimeout, now, new PollCondition() {
                @Override
                public boolean shouldBlock() {
                    // since a fetch might be completed by the background thread, we need this poll condition
                    // to ensure that we do not block unnecessarily in poll()
                    return !fetcher.hasCompletedFetches();
                }
            });

    意思是调用poll的超时是pollTimeout,
    PollCondition.shouldBlock,意思是何时我们需要block等待,当hasCompletedFetches时,是不需要等数据的,所以只有当没有现成的数据的时候,才需要等

     

    ConsumerNetworkClient.poll

    /**
         * Poll for any network IO.
         * @param timeout timeout in milliseconds
         * @param now current time in milliseconds
         */
        public void poll(long timeout, long now, PollCondition pollCondition) {
            // there may be handlers which need to be invoked if we woke up the previous call to poll
            firePendingCompletedRequests();
    
            synchronized (this) {
                // send all the requests we can send now
                trySend(now);
    
                // check whether the poll is still needed by the caller. Note that if the expected completion
                // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
                // handler), the client will be woken up.
                if (pollCondition == null || pollCondition.shouldBlock()) {
                    // if there are no requests in flight, do not block longer than the retry backoff
                    if (client.inFlightRequestCount() == 0)
                        timeout = Math.min(timeout, retryBackoffMs);
                    client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
                    now = time.milliseconds();
                } else {
                    client.poll(0, now);
                }
    
                // handle any disconnects by failing the active requests. note that disconnects must
                // be checked immediately following poll since any subsequent call to client.ready()
                // will reset the disconnect status
                checkDisconnects(now);
    
                // trigger wakeups after checking for disconnects so that the callbacks will be ready
                // to be fired on the next call to poll()
                maybeTriggerWakeup();
                
                // throw InterruptException if this thread is interrupted
                maybeThrowInterruptException();
    
                // try again to send requests since buffer space may have been
                // cleared or a connect finished in the poll
                trySend(now);
    
                // fail requests that couldn't be sent if they have expired
                failExpiredRequests(now);
            }
    
            // called without the lock to avoid deadlock potential if handlers need to acquire locks
            firePendingCompletedRequests();
        }

     

    trySend

    private boolean trySend(long now) {
            // send any requests that can be sent now
            boolean requestsSent = false;
            for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) { // 前面send的时候时候,request放入unsent
                Node node = requestEntry.getKey();
                Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
                while (iterator.hasNext()) {
                    ClientRequest request = iterator.next();
                    if (client.ready(node, now)) {// Begin connecting to the given node, return true if we are already connected and ready to send to that node 
                        client.send(request, now); // 调用send,发送request
                        iterator.remove();
                        requestsSent = true;
                    }
                }
            }
            return requestsSent;
        }

     

    NetworkClient.send

    /**
         * Queue up the given request for sending. Requests can only be sent out to ready nodes.
         * @param request The request
         * @param now The current timestamp
         */
        @Override
        public void send(ClientRequest request, long now) {
            doSend(request, false, now);
        }

     

    private void doSend(ClientRequest request, boolean isInternalRequest, long now) {
            String nodeId = request.destination();
            if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {
                if (!canSendApiVersionsRequest(nodeId))
                    throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");
            } else if (!canSendRequest(nodeId))
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    
            Send send = request.body().toSend(nodeId, request.header());
            InFlightRequest inFlightRequest = new InFlightRequest(
                    request.header(),
                    request.createdTimeMs(),
                    request.destination(),
                    request.callback(),
                    request.expectResponse(),
                    isInternalRequest,
                    send,
                    now);
    
            this.inFlightRequests.add(inFlightRequest); // 加入inFlightRequest
            selector.send(inFlightRequest.send);
        }

    最终用selector.send来发送Send

    /**
         * Queue the given request for sending in the subsequent {@link #poll(long)} calls
         * @param send The request to send
         */
        public void send(Send send) {
            String connectionId = send.destination();
            if (closingChannels.containsKey(connectionId))
                this.failedSends.add(connectionId);
            else {
                KafkaChannel channel = channelOrFail(connectionId, false); // 从Map<String, KafkaChannel> channels中get该connect对应的channel
                try {
                    channel.setSend(send);
                } catch (CancelledKeyException e) {
                    this.failedSends.add(connectionId);
                    close(channel, false);
                }
            }
        }

    KafkaChannel.setSend

    public void setSend(Send send) {
            if (this.send != null)
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
            this.send = send;
            this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
        }

    可以看到select.send也只是把send放到channel中,

    真正发送要等到调用NetworkClient.poll

    在ConsumerNetworkClient.poll中,

                if (pollCondition == null || pollCondition.shouldBlock()) {
                    // if there are no requests in flight, do not block longer than the retry backoff
                    if (client.inFlightRequestCount() == 0)
                        timeout = Math.min(timeout, retryBackoffMs);
                    client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
                    now = time.milliseconds();
                } else {
                    client.poll(0, now);
                }

    如果需要block或没有pollCondition,选择block timeout来等待数据

    否则调用client.poll(0, now),意思是没有数据即刻返回

    NetworkClient.poll

    @Override
        public void poll(long timeout) throws IOException {
            if (timeout < 0)
                throw new IllegalArgumentException("timeout should be >= 0");
    
            clear();
    
            if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
                timeout = 0;
    
            /* check ready keys */
            long startSelect = time.nanoseconds();
            int readyKeys = select(timeout);
            long endSelect = time.nanoseconds();
            this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    
            if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
                pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
                pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            }
    
            addToCompletedReceives();
    
            long endIo = time.nanoseconds();
            this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    
            // we use the time at the end of select to ensure that we don't close any connections that
            // have just been processed in pollSelectionKeys
            maybeCloseOldestConnection(endSelect);
        }

     

    select

        private int select(long ms) throws IOException {
            if (ms < 0L)
                throw new IllegalArgumentException("timeout should be >= 0");
    
            if (ms == 0L)
                return this.nioSelector.selectNow();
            else
                return this.nioSelector.select(ms);
        }

     

    pollSelectionKeys

        private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos) {
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                KafkaChannel channel = channel(key);
    
                try {
    
                    /* complete any connections that have finished their handshake (either normally or immediately) */
                    if (isImmediatelyConnected || key.isConnectable()) {
                        if (channel.finishConnect()) {
                            this.connected.add(channel.id());
                            this.sensors.connectionCreated.record();
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                    socketChannel.socket().getReceiveBufferSize(),
                                    socketChannel.socket().getSendBufferSize(),
                                    socketChannel.socket().getSoTimeout(),
                                    channel.id());
                        } else
                            continue;
                    }
    
                    /* if channel is not ready finish prepare */
                    if (channel.isConnected() && !channel.ready())
                        channel.prepare();
    
                    /* if channel is ready read from any connections that have readable data */
                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null)
                            addToStagedReceives(channel, networkReceive);
                    }
    
                    /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                    if (channel.ready() && key.isWritable()) {
                        Send send = channel.write(); //真正写出数据
                        if (send != null) {
                            this.completedSends.add(send);
                        }
                    }
    
                    /* cancel any defunct sockets */
                    if (!key.isValid())
                        close(channel, true);
    
                } catch (Exception e) {
    
                }
            }
        }

     

    直接用NIO写应用,是需要勇气的

  • 相关阅读:
    redis 储存对象
    redis key 查看器
    c# 控制台程序编写RabbitMQ 生产者
    C# 使用Topshelf 构建 基于 window 服务的 RabbitMQ消费端
    asp.net webapi 使用定时任务Hangfire
    asp.net webpi 中使用 ClientHelper 发起HTTP请求
    SQL Server 导入和导出向导 未在本地计算机上注册Mircrosoft.ACE.OLEDB.12.0 提供程序
    c# 使用Linq 表达式 对查询结果分组,保留价格最低的一条
    Asp.Net s请求报传输流收到意外的 EOF 或 0 个字节
    asp.net webapi 中使用rdlc 报表
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6495818.html
Copyright © 2011-2022 走看看