zoukankan      html  css  js  c++  java
  • flink-connector-kafka consumer的topic分区分配源码

    转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7200599.html 

    flink官方提供了连接kafka的connector实现,由于调试的时候发现部分消费行为与预期不太一致,所以需要研究一下源码。

    flink-connector-kafka目前已有kafka 0.8、0.9、0.10三个版本的实现,本文以FlinkKafkaConsumer010版本代码为例。

    FlinkKafkaConsumer010类的父类继承关系如下,FlinkKafkaConsumerBase包含了大多数实现。

    FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> 

    其中每个版本的FlinkKafkaConsumerBase内部都实现了一个对应的AbstractFetcher用来拉取kafka数据,继承关系如下

    Kafka010Fetcher<T> extends Kafka09Fetcher<T>extends AbstractFetcher<T, TopicPartition> 

    FlinkKafkaConsumerBase类定义如下,继承了RichParallelSourceFunction和CheckpointedFunction等接口。

    public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
            CheckpointListener,
            ResultTypeQueryable<T>,
            CheckpointedFunction,
            CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {

    FlinkKafkaConsumer内部各方法的执行细节

    initializeState

        public void initializeState(FunctionInitializationContext context) throws Exception {
    
            OperatorStateStore stateStore = context.getOperatorStateStore();
            offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
            if (context.isRestored()) {
                if (restoredState == null) {
                    restoredState = new HashMap<>();
                    for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                        restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
                    }
    
                    LOG.info("Setting restore state in the FlinkKafkaConsumer.");
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Using the following offsets: {}", restoredState);
                    }
                }
                if (restoredState != null && restoredState.isEmpty()) {
                    restoredState = null;
                }
            } else {
                LOG.info("No restore state for FlinkKafkaConsumer.");
            }
        }

    根据运行日志,initializeState会在flinkkafkaconusmer初始化的时候最先调用,方法通过运行时上下文FunctionSnapshotContext调用getOperatorStateStore和getSerializableListState拿到了checkpoint里面的state对象,如果这个task是从失败等过程中恢复的过程中,context.isRestored()会被判定为true,程序会试图从flink checkpoint里获取原来分配到的kafka partition以及最后提交完成的offset。

    open

        public void open(Configuration configuration) {
            // determine the offset commit mode
            offsetCommitMode = OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
                    ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
    
            switch (offsetCommitMode) {
                case ON_CHECKPOINTS:
                    LOG.info("Consumer subtask {} will commit offsets back to Kafka on completed checkpoints.",
                            getRuntimeContext().getIndexOfThisSubtask());
                    break;
                case KAFKA_PERIODIC:
                    LOG.info("Consumer subtask {} will commit offsets back to Kafka periodically using the Kafka client's auto commit.",
                            getRuntimeContext().getIndexOfThisSubtask());
                    break;
                default:
                case DISABLED:
                    LOG.info("Consumer subtask {} has disabled offset committing back to Kafka." +
                            " This does not compromise Flink's checkpoint integrity.",
                            getRuntimeContext().getIndexOfThisSubtask());
            }
    
            // initialize subscribed partitions
            List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
            Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null.");
    
            subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
    
            if (restoredState != null) {
                for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
                    if (restoredState.containsKey(kafkaTopicPartition)) {
                        subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
                    }
                }
    
                LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                    getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
            } else {
                initializeSubscribedPartitionsToStartOffsets(
                    subscribedPartitionsToStartOffsets,
                    kafkaTopicPartitions,
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks(),
                    startupMode,
                    specificStartupOffsets);
    
                if (subscribedPartitionsToStartOffsets.size() != 0) {
                    switch (startupMode) {
                        case EARLIEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case LATEST:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case SPECIFIC_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());
    
                            List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                                if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                    partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                                }
                            }
    
                            if (partitionsDefaultedToGroupOffsets.size() > 0) {
                                LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                        "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                            }
                            break;
                        default:
                        case GROUP_OFFSETS:
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                    }
                }
            }
        }

    open方法会在initializeState技术后调用,主要逻辑分为几个步骤

    1 判断offsetCommitMode。根据kafka的auto commit ,setCommitOffsetsOnCheckpoints()的值(默认为true)以及flink运行时有没有开启checkpoint三个参数的组合,

    offsetCommitMode共有三种模式:ON_CHECKPOINTS  checkpoint结束后提交offset;KAFKA_PERIODIC kafkaconsumer自带的定期提交功能;DISABLED 不提交

    2 分配kafka partition 。如果initializeState阶段已经拿到了state之前存储的partition,直接继续读取对应的分区,如果是第一次初始化,调initializeSubscribedPartitionsToStartOffsets

    方法计算当前task对应的分区列表

      
        protected static void initializeSubscribedPartitionsToStartOffsets(
                Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
                List<KafkaTopicPartition> kafkaTopicPartitions,
                int indexOfThisSubtask,
                int numParallelSubtasks,
                StartupMode startupMode,
                Map<KafkaTopicPartition, Long> specificStartupOffsets) {
    
            for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
                if (i % numParallelSubtasks == indexOfThisSubtask) {
                    if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
                        subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
                    } else {
                        if (specificStartupOffsets == null) {
                            throw new IllegalArgumentException(
                                "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                    ", but no specific offsets were specified");
                        }
    
                        KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
    
                        Long specificOffset = specificStartupOffsets.get(partition);
                        if (specificOffset != null) {
                            // since the specified offsets represent the next record to read, we subtract
                            // it by one so that the initial state of the consumer will be correct
                            subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
                        } else {
                            subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }
                }
            }
        }

    可以看到,flink采用分区号逐个对flink并发任务数量取余的方式来分配partition,如果i % numParallelSubtasks == indexOfThisSubtask,那么这个i分区就归属当前分区拥有。

    partition的分区结果记录在私有变量Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets 里,用于后续初始化consumer。

    run方法

        @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
            if (subscribedPartitionsToStartOffsets == null) {
                throw new Exception("The partitions were not set for the consumer");
            }
    
            // we need only do work, if we actually have partitions assigned
            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    
                // create the fetcher that will communicate with the Kafka brokers
                final AbstractFetcher<T, ?> fetcher = createFetcher(
                        sourceContext,
                        subscribedPartitionsToStartOffsets,
                        periodicWatermarkAssigner,
                        punctuatedWatermarkAssigner,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode);
    
                // publish the reference, for snapshot-, commit-, and cancel calls
                // IMPORTANT: We can only do that now, because only now will calls to
                //            the fetchers 'snapshotCurrentState()' method return at least
                //            the restored offsets
                this.kafkaFetcher = fetcher;
                if (!running) {
                    return;
                }
                
                // (3) run the fetcher' main work method
                fetcher.runFetchLoop();
            }
            else {
                // this source never completes, so emit a Long.MAX_VALUE watermark
                // to not block watermark forwarding
                sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
    
                // wait until this is canceled
                final Object waitLock = new Object();
                while (running) {
                    try {
                        //noinspection SynchronizationOnLocalVariableOrMethodParameter
                        synchronized (waitLock) {
                            waitLock.wait();
                        }
                    }
                    catch (InterruptedException e) {
                        if (!running) {
                            // restore the interrupted state, and fall through the loop
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
        }

    可以看到计算好的subscribedPartitionsToStartOffsets被传到了拥有consumerThread的AbstractFetcher实例内部,KafkaConsumerThread通过调用consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates));方法最终调用到了consumer.assign(topicPartitions);手动向consumer实例指定了topic分配。

    参考文档:

    Working with State

  • 相关阅读:
    Inside Dynamics Axapta源代码赏析(五)
    Inside Dynamics Axapta源代码赏析(二)
    PHP查询显示关键字为红色
    textarea自适应高度
    背景图只显示一次,不循环,另外还有一个是PHP下的换行显示
    PHP格式化时间,形如(Wed 30,Jul 2008)
    实现页面加载中。。
    c++关于设置精度
    HDOJ 2662
    live Messenger
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/7200599.html
Copyright © 2011-2022 走看看