zoukankan      html  css  js  c++  java
  • kafka consumer 自动提交 offset

    org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();
    
        long startMs = time.milliseconds();
        // 这里面触发自动提交
        coordinator.poll(startMs, timeout);
    
        // Lookup positions of assigned partitions
        boolean hasAllFetchPositions = updateFetchPositions();
    
        // 对拉取到的数据,更新 position
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
    
        // 发送拉取数据请求
        fetcher.sendFetches();
    
        long nowMs = time.milliseconds();
        long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
        long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
    
        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure
        if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
            pollTimeout = retryBackoffMs;
    
        client.poll(pollTimeout, nowMs, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });
    
        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();
    
        return fetcher.fetchedRecords();
    }

    结论就是:consumer 拉取到消息后,会更新保存的位点信息,下次拉取消息前,若自动提交的时间到了,就会把位点信息提交到 broker。

  • 相关阅读:
    《网络是怎样连接的》读书笔记一
    移植mplayer到开发板
    解决ubuntu安装ssh服务无法打开解析包问题
    嵌入式软件工程师面经
    c语言-数组、指针面试题
    Linux命令- echo、grep 、重定向、1>&2、2>&1的介绍
    回调函数的作用
    数据结构-单链表回环函数判断
    算法-一步步教你如何用c语言实现堆排序(非递归)
    算法-快速排序
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11473827.html
Copyright © 2011-2022 走看看