zoukankan      html  css  js  c++  java
  • 【源码】 flink 消费 kafka 消费组 offset 提交

    flink 消费 kafka 数据,提交消费组 offset 有三种类型

    •  1、开启 checkpoint :                                                  在 checkpoint 完成后提交
    •  2、开启 checkpoint,禁用 checkpoint 提交:             不提交消费组 offset
    •  3、不开启 checkpoint:                                              依赖kafka client 的自动提交

    重点当然是开启 checkpoint 的时候,怎么提交消费组的 offset

    一个简单的 flink 程序: 读取kafka topic 数据,写到另一个 topic

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // enable checkpoint
    val stateBackend = new FsStateBackend("file:///out/checkpoint")
    env.setStateBackend(stateBackend)
    env.enableCheckpointing(1 * 60 * 1000, CheckpointingMode.EXACTLY_ONCE)
    
    val prop = Common.getProp
    //        prop.setProperty("enable.auto.commit", "true")
    //        prop.setProperty("auto.commit.interval.ms", "15000")
    val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop)
    //        kafkaSource.setCommitOffsetsOnCheckpoints(false)
    
    val kafkaProducer = new FlinkKafkaProducer[String]("kafka_offset_out", new SimpleStringSchema(), prop)
    //        kafkaProducer.setWriteTimestampToKafka(true)
    
    env.addSource(kafkaSource)
      .setParallelism(1)
      .map(node => {
        node.toString + ",flinkx"
      })
      .addSink(kafkaProducer)
    
    // execute job
    env.execute("KafkaToKafka")

    ## 1 启动 checkpoint

    开启checkpoint 默认值就是 消费组 offset 的提交方式是: ON_CHECKPOINTS

    offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的时候会设置:

    FlinkKafkaConsumer 提交消费者的 offset 的行为在 FlinkKafkaConsumerBase open 的时候会设置:

    @Override
    public void open(Configuration configuration) throws Exception {
      // determine the offset commit mode
      this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
          getIsAutoCommitEnabled(),
          enableCommitOnCheckpoints,  // 默认值 true
          ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

    fromConfiguration 对应代码

    public static OffsetCommitMode fromConfiguration(
          boolean enableAutoCommit,
          boolean enableCommitOnCheckpoint,
          boolean enableCheckpointing) {
    
      if (enableCheckpointing) {
        // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
        return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
      } else {
        // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
        return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
      }
    }

    当 flink 触发一次 checkpoint 的时候,会依次调用所有算子的 notifyCheckpointComplete 方法,kafka source 会调用到 FlinkKafkaConsumerBase.notifyCheckpointComplete

    注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父类

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
      ....
    
      if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
        // only one commit operation must be in progress
        ...
    
        try {
          // 获取当前checkpoint id 对应的待提交的 offset index
          final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
          if (posInMap == -1) {
            LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
              getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
            return;
          }
          // 根据 offset index 获取 offset 值,待提交的就直接删除了
          @SuppressWarnings("unchecked")
          Map<KafkaTopicPartition, Long> offsets =
            (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
          
          ....
    
          // 调用 KafkaFetcher的 commitInternalOffsetsToKafka 方法 提交 offset
          fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
        
        ....

    最后调用了 AbstractFetcher.commitInternalOffsetsToKafka 

    public final void commitInternalOffsetsToKafka(
        Map<KafkaTopicPartition, Long> offsets,
        @Nonnull KafkaCommitCallback commitCallback) throws Exception {
      // Ignore sentinels. They might appear here if snapshot has started before actual offsets values
      // replaced sentinels
      doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
    }
    
    protected abstract void doCommitInternalOffsetsToKafka(
        Map<KafkaTopicPartition, Long> offsets,
        @Nonnull KafkaCommitCallback commitCallback) throws Exception;

    AbstractFetcher.doCommitInternalOffsetsToKafka 的实现 KafkaFetcher.doCommitInternalOffsetsToKafka

    使用 Map<KafkaTopicPartition, Long> offsets 构造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit

    注:offset + 1 表示下一次消费的位置

    @Override
    protected void doCommitInternalOffsetsToKafka(
      Map<KafkaTopicPartition, Long> offsets,
      @Nonnull KafkaCommitCallback commitCallback) throws Exception {
    
      @SuppressWarnings("unchecked")
      List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();
    
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());
    
      for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) {
        Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
        if (lastProcessedOffset != null) {
          checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
    
          // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
          // This does not affect Flink's checkpoints/saved state.
          long offsetToCommit = lastProcessedOffset + 1;
    
          offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
          partition.setCommittedOffset(offsetToCommit);
        }
      }
    
      // record the work to be committed by the main consumer thread and make sure the consumer notices that
      consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
    }

    然后调用 KafkaConsumerThread.setOffsetsToCommit:  将待提交的 offset 放到 kafka 的消费线程对于的属性 nextOffsetsToCommit 中,等待下一个消费循环提交

    void setOffsetsToCommit(
          Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
          @Nonnull KafkaCommitCallback commitCallback) {
    
        // 把待提交的 offsetsToCommit 放到 nextOffsetsToCommit 中,供 kafka 的消费线程来取
        // 返回值不为 null,说明上次的没提交完成
        // record the work to be committed by the main consumer thread and make sure the consumer notices that
        if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
          log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
              "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
              "This does not compromise Flink's checkpoint integrity.");
        }
    
        // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
        handover.wakeupProducer();
    
        synchronized (consumerReassignmentLock) {
          if (consumer != null) {
            consumer.wakeup();
          } else {
            // the consumer is currently isolated for partition reassignment;
            // set this flag so that the wakeup state is restored once the reassignment is complete
            hasBufferedWakeup = true;
          }
        }
      }

    然后就到了kafka 消费的线程,KafkaConsumerThread.run 方法中:  这里是消费 kafka 数据的地方,也提交对应消费组的offset

    @Override
      public void run() {
        ...
    
          this.consumer = getConsumer(kafkaProperties);
    
        ....
          // 循环从kafka poll 数据
          // main fetch loop 
          while (running) {
            // 这里就是提交 offset 的地方了
            // check if there is something to commit
            if (!commitInProgress) {
    
              // nextOffsetsToCommit 就是 那边线程放入 offset 的对象了
    
              // get and reset the work-to-be committed, so we don't repeatedly commit the same
              final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
                  nextOffsetsToCommit.getAndSet(null);
    
              // 如果取出commitOffsetsAndCallback 不为空,就异步提交 offset 到kafka
              if (commitOffsetsAndCallback != null) {
                log.debug("Sending async offset commit request to Kafka broker");
    
                // also record that a commit is already in progress
                // the order here matters! first set the flag, then send the commit command.
                commitInProgress = true;
                consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
              }
            }
    
           ... 
            // get the next batch of records, unless we did not manage to hand the old batch over
            if (records == null) {
              try {
                records = consumer.poll(pollTimeout);
              }
              catch (WakeupException we) {
                continue;
              }
            }
    
            ...
          }

    到这里就能看到 flink 的offset 提交到了 kafka 中


    ## 2 开启 checkpoint 禁用 commit on checkpoint

    这是启动 checkpoing kafka consumer offset 提交的默认行为,现在看下,关闭在 checkpoint 的时候提交:
    先关闭 commitOnCheckpoints

    val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), Common.getProp)
    kafkaSource.setCommitOffsetsOnCheckpoints(false)

    对应方法代码:

    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
      // enableCommitOnCheckpoints 的默认值是 true
      this.enableCommitOnCheckpoints = commitOnCheckpoints;
      return this;
    }

    警告: 如果启用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消费组的 offset 不会提交到 kafka,也就是说: 消费组的 offset 是不会有变化的

    如下 CURRENT-OFFSET 是不会变化的:

    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    kafka_offset    0          4172            4691            519             -               -               -

    官网: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

    ## 3 不开启 checkpoint 模式

    禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能
    要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值

    prop.setProperty("enable.auto.commit", "true")
    prop.setProperty("auto.commit.interval.ms", "15000")

    然后,发现这个问题超纲了,跳过

    O(∩_∩)O哈哈~

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    [Spark内核] 第38课:BlockManager架构原理、运行流程图和源码解密
    [Spark内核] 第37课:Task执行内幕与结果处理解密
    [Spark内核] 第36课:TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
    [Spark内核] 第35课:打通 Spark 系统运行内幕机制循环流程
    [Spark性能调优] 第三章 : Spark 2.1.0 中 Sort-Based Shuffle 产生的内幕
    [Spark内核] 第34课:Stage划分和Task最佳位置算法源码彻底解密
    spring boot 日志收集整理
    mybatis 使用redis实现二级缓存(spring boot)
    MD5收集整理
    IntelliJ IDEA 2019.2最新版本免费激活码
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/13405140.html
Copyright © 2011-2022 走看看