zoukankan      html  css  js  c++  java
  • kafka consumer 自动提交 offset

    org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();
    
        long startMs = time.milliseconds();
        // 这里面触发自动提交
        coordinator.poll(startMs, timeout);
    
        // Lookup positions of assigned partitions
        boolean hasAllFetchPositions = updateFetchPositions();
    
        // 对拉取到的数据,更新 position
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
    
        // 发送拉取数据请求
        fetcher.sendFetches();
    
        long nowMs = time.milliseconds();
        long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
        long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
    
        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure
        if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
            pollTimeout = retryBackoffMs;
    
        client.poll(pollTimeout, nowMs, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });
    
        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();
    
        return fetcher.fetchedRecords();
    }

    结论就是:consumer 拉取到消息后,会更新保存的位点信息,下次拉取消息前,若自动提交的时间到了,就会把位点信息提交到 broker。

  • 相关阅读:
    PowerShell 显示气球提示框 2
    BAT 删除隐藏文件
    批处理文件 bat 后台运行
    CMD 命令2
    CMD 命令1
    PowerShell 显示气球提示框 1
    查看SQL Server的版本及License
    How to extract a complete list of extension types within a directory?
    Wordpress无法连接Mysql8的问题
    What is `^M` and how do I get rid of it?
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11473827.html
Copyright © 2011-2022 走看看