zoukankan      html  css  js  c++  java
  • kafka consumer 自动提交 offset

    org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();
    
        long startMs = time.milliseconds();
        // 这里面触发自动提交
        coordinator.poll(startMs, timeout);
    
        // Lookup positions of assigned partitions
        boolean hasAllFetchPositions = updateFetchPositions();
    
        // 对拉取到的数据,更新 position
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
    
        // 发送拉取数据请求
        fetcher.sendFetches();
    
        long nowMs = time.milliseconds();
        long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
        long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
    
        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure
        if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
            pollTimeout = retryBackoffMs;
    
        client.poll(pollTimeout, nowMs, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });
    
        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();
    
        return fetcher.fetchedRecords();
    }

    结论就是:consumer 拉取到消息后,会更新保存的位点信息,下次拉取消息前,若自动提交的时间到了,就会把位点信息提交到 broker。

  • 相关阅读:
    vue中的ref,refs使用
    setTimeout 为0的作用
    click 和 mousedown 以及map地图的pointerdown
    electron图标
    websocket
    居中
    一键生成vue模板
    IntelliJ IDEA 安装破解及汉化详解
    基础的一些东西
    git 合作开发
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11473827.html
Copyright © 2011-2022 走看看