zoukankan      html  css  js  c++  java
  • Flink

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", "localhost:2181");
    properties.setProperty("group.id", "test");
    DataStream<String> stream = env
        .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

    使用方式如上,核心就是对SourceFunction的实现

    FlinkKafkaConsumer010除了重写createFetcher外,大部分都是继承自FlinkKafkaConsumerBase
     

    FlinkKafkaConsumerBase

     
        public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
            CheckpointListener,
            ResultTypeQueryable<T>,
            CheckpointedFunction,
            CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>

    FlinkKafkaConsumerBase继承RichParallelSourceFunction,实现4个接口

    RichFunction.open

    先看看FlinkKafkaConsumerBase初始化,

        @Override
        public void open(Configuration configuration) {
            // determine the offset commit mode
            // offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED;如果打开checkpoint,offest会记录在snapshot中,否则offset会定期写回kafka topic
            offsetCommitMode = OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
                    ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
    
            // initialize subscribed partitions
            List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); //获取topic的partition信息
    
            subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); //Map<KafkaTopicPartition, Long>,用于记录每个partition的offset
    
            if (restoredState != null) { //如果有可恢复的state
                for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
                    if (restoredState.containsKey(kafkaTopicPartition)) { //如果state中包含该partition
                        subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); //将snapshot中的offset恢复
                    }
                }
    
            } else { //如果没有state,那么初始化subscribedPartitionsToStartOffsets
                initializeSubscribedPartitionsToStartOffsets(
                    subscribedPartitionsToStartOffsets,
                    kafkaTopicPartitions,
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks(),
                    startupMode,
                    specificStartupOffsets);
                //startupMode,有下面几种,默认是GROUP_OFFSETS
                if (subscribedPartitionsToStartOffsets.size() != 0) {
                    switch (startupMode) {
                        case EARLIEST: //从最早的开始读
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case LATEST: //从最新的开始读
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                            break;
                        case SPECIFIC_OFFSETS: //从特定的offset开始读
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());
    
                            List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                                if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                    partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                                }
                            }
    
                            if (partitionsDefaultedToGroupOffsets.size() > 0) { //说明你有某些partitions没有指定offset,所以用了默认的GROUP_OFFSET
                                LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                        "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                            }
                            break;
                        default:
                        case GROUP_OFFSETS: //根据kafka group中的offset开始读
                            LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                    }
                }
            }
        }
        
        protected static void initializeSubscribedPartitionsToStartOffsets(
                Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
                List<KafkaTopicPartition> kafkaTopicPartitions,
                int indexOfThisSubtask,
                int numParallelSubtasks,
                StartupMode startupMode,
                Map<KafkaTopicPartition, Long> specificStartupOffsets) {
    
            for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
                if (i % numParallelSubtasks == indexOfThisSubtask) { //如果这个partition会分配到该task;所以只有被分配到的partition会有offset数据,这里实际做了partition的分配
                    if (startupMode != StartupMode.SPECIFIC_OFFSETS) { //如果不是SPECIFIC_OFFSETS,就把offset设为特定的常量值
                        subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
                    } else {
    
                        KafkaTopicPartition partition = kafkaTopicPartitions.get(i);
    
                        Long specificOffset = specificStartupOffsets.get(partition);
                        if (specificOffset != null) {
                            // since the specified offsets represent the next record to read, we subtract
                            // it by one so that the initial state of the consumer will be correct
                            subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); //设置上你配置的partition对应的offset,注意需要减一
                        } else { //如果没有该partition的offset,就用默认的GROUP_OFFSET
                            subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }
                }
            }
        }

    初始化的工作主要是恢复和初始化,topic partition的offset

    RichParallelSourceFunction

    核心run函数,

        @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
    
            // we need only do work, if we actually have partitions assigned
            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    
                // create the fetcher that will communicate with the Kafka brokers
                final AbstractFetcher<T, ?> fetcher = createFetcher(
                        sourceContext, //sourceContext,用于发送record和watermark
                        subscribedPartitionsToStartOffsets, //partition,offset对应关系
                           periodicWatermarkAssigner,
                        punctuatedWatermarkAssigner,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode); //offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED
    
                // publish the reference, for snapshot-, commit-, and cancel calls
                // IMPORTANT: We can only do that now, because only now will calls to
                //            the fetchers 'snapshotCurrentState()' method return at least
                //            the restored offsets
                this.kafkaFetcher = fetcher;
                if (!running) {
                    return;
                }
    
                // (3) run the fetcher' main work method
                fetcher.runFetchLoop();
            }
        }

    主要就是创建Fetcher,并启动,Fetcher中做了具体的工作

    创建Fetcher的参数都中大多比较容易理解, 除了,

    periodicWatermarkAssigner

    punctuatedWatermarkAssigner

    这些是用来产生watermark的,参考 Flink - watermark

    CheckpointedFunction 接口

    主要实现,initializeState,snapshotState函数

    initializeState,目的就是从stateBackend中把offset state恢复到restoredState;这个数据在open时候会被用到

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
    
            OperatorStateStore stateStore = context.getOperatorStateStore();
            offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); //从StateBackend读出state
    
            if (context.isRestored()) {
                if (restoredState == null) {
                    restoredState = new HashMap<>();
                    for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                        restoredState.put(kafkaOffset.f0, kafkaOffset.f1); //将offsetsStateForCheckpoint数据恢复到restoredState
                    }
                }
            }
        }

    snapshotState,做snapshot的逻辑

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        
            offsetsStateForCheckpoint.clear(); //transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint
    
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                //...
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //从fetcher snapshot最新的offset数据
    
                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); //增加pendingOffset
                }
    
                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    offsetsStateForCheckpoint.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); //把offset存入stateBackend
                }
            }
    
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { //pending的太多,删掉老的
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }

    CheckpointListener 接口

        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
    
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
    
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // only one commit operation must be in progress
                try {
                    final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); //在pendingOffsetsToCommit中找这个checkpoint
                    if (posInMap == -1) {
                        LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                        return;
                    }
    
                    @SuppressWarnings("unchecked")
                    HashMap<KafkaTopicPartition, Long> offsets =
                        (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); //移除该checkpoint
    
                    // remove older checkpoints in map
                    for (int i = 0; i < posInMap; i++) {
                        pendingOffsetsToCommit.remove(0); //把比这个checkpoint更老的也都删掉
                    }
    
                    if (offsets == null || offsets.size() == 0) {
                        LOG.debug("Checkpoint state was empty.");
                        return;
                    }
                    fetcher.commitInternalOffsetsToKafka(offsets);//将offset信息发给kafka的group
                } 
            }
        }

    Kafka010Fetcher

    FlinkKafkaConsumer010也就重写createFetcher

    不同的kafka版本就Fetcher是不一样的

    public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>

    Kafka010Fetcher的不同,

        @Override
        protected void emitRecord(
                T record,
                KafkaTopicPartitionState<TopicPartition> partition,
                long offset,
                ConsumerRecord<?, ?> consumerRecord) throws Exception {
    
            // we attach the Kafka 0.10 timestamp here
            emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); //0.10支持record中带timestap
        }
    
        /**
         * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
         * changing binary signatures.
         */
        @Override
        protected KafkaConsumerCallBridge010 createCallBridge() {
            return new KafkaConsumerCallBridge010(); //CallBridge目的是在封装各个版本kafka consumer之间的差异
        }

    KafkaConsumerCallBridge010封装0.10版本中做assignPartitions,seek上API和其他版本的差异性

    public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
    
        @Override
        public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
            consumer.assign(topicPartitions);
        }
    
        @Override
        public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
            consumer.seekToBeginning(Collections.singletonList(partition));
        }
    
        @Override
        public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
            consumer.seekToEnd(Collections.singletonList(partition));
        }
    }

    Kafka09Fetcher

    关键是runFetchLoop,启动KafkaConsumerThread

    并从handover中取出records,然后封装发出去

        @Override
        public void runFetchLoop() throws Exception {
            try {
                final Handover handover = this.handover; //handover用于在Fetcher线程和consumer线程间传递数据
    
                // kick off the actual Kafka consumer
                consumerThread.start(); //new KafkaConsumerThread,真正的consumer线程
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the fetcher thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); //从handover中拿出数据
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                                records.records(partition.getKafkaPartitionHandle());//ConsumerRecords中的结构是Map<TopicPartition, List<ConsumerRecord<K, V>>> records
    
                        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                            final T value = deserializer.deserialize(
                                    record.key(), record.value(),
                                    record.topic(), record.partition(), record.offset());
    
                            if (deserializer.isEndOfStream(value)) {
                                // end of stream signaled
                                running = false;
                                break;
                            }
    
                            // emit the actual record. this also updates offset state atomically
                            // and deals with timestamps and watermark generation
                            emitRecord(value, partition, record.offset(), record);
                        }
                    }
                }
            }

    这里有个重要的结构是,subscribedPartitionStates

    AbstractFetcher
            // create our partition state according to the timestamp/watermark mode
            this.subscribedPartitionStates = initializeSubscribedPartitionStates(
                    assignedPartitionsWithInitialOffsets,
                    timestampWatermarkMode,
                    watermarksPeriodic, watermarksPunctuated,
                    userCodeClassLoader);

    可以看到,把这些信息都合并放到subscribedPartitionStates,尤其是assignedPartitionsWithInitialOffsets

    /**
         * Utility method that takes the topic partitions and creates the topic partition state
         * holders. If a watermark generator per partition exists, this will also initialize those.
         */
        private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(
                Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets,
                int timestampWatermarkMode,
                SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
            switch (timestampWatermarkMode) {
    
                case NO_TIMESTAMPS_WATERMARKS: {
                     //.......
    
                case PERIODIC_WATERMARKS: {
                    @SuppressWarnings("unchecked")
                    KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = //KafkaTopicPartitionStateWithPeriodicWatermarks是KafkaTopicPartitionState的子类
                            (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                                    new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; //大小和assignedPartitionsToInitialOffsets一样
    
                    int pos = 0;
                    for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
                        KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); //生成kafkaHandle,这个就是TopicPartition信息的抽象,为了屏蔽版本间结构的差异
    
                        AssignerWithPeriodicWatermarks<T> assignerInstance = //AssignerWithPeriodicWatermarks
                                watermarksPeriodic.deserializeValue(userCodeClassLoader);
    
                        partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( //对于PUNCTUATED_WATERMARKS,这里是KafkaTopicPartitionStateWithPunctuatedWatermarks
                                partition.getKey(), kafkaHandle, assignerInstance); //对于NO_TIMESTAMPS_WATERMARKS,这里没有assignerInstance参数
                        partitions[pos].setOffset(partition.getValue()); //设置offset
     
                        pos++;
                    }
    
                    return partitions;
                }
    
                case PUNCTUATED_WATERMARKS: {
                    //......
                }

    subscribedPartitionStates,中包含该TopicPartition的offset和watermark的提取逻辑

    KafkaConsumerThread

    @Override
        public void run() {
    
            // this is the means to talk to FlinkKafkaConsumer's main thread
            final Handover handover = this.handover; //线程间数据交换结构
    
            // This method initializes the KafkaConsumer and guarantees it is torn down properly.
            // This is important, because the consumer has multi-threading issues,
            // including concurrent 'close()' calls.
            final KafkaConsumer<byte[], byte[]> consumer;
            try {
                consumer = new KafkaConsumer<>(kafkaProperties); //初始化kafka consumer
            }
            catch (Throwable t) {
                handover.reportError(t);
                return;
            }
    
            // from here on, the consumer is guaranteed to be closed properly
            try {
                // The callback invoked by Kafka once an offset commit is complete
                final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); //这个callback,只是commitInProgress = false,表示commit结束
    
                // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the
                // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset
                // values yet; replace those with actual offsets, according to what the sentinel value represent.
                for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) {
                    if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { //先把kafka group offset强制设为earliest或latest,然后用kafka上最新的offset更新当前的offset
                        consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle());
                        partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
                    } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                        consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle());
                        partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
                    } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                        // the KafkaConsumer by default will automatically seek the consumer position
                        // to the committed group offset, so we do not need to do it.
    
                        partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); //如果GROUP_OFFSET,就直接读取kafka group上的offset
                    } else {
                        consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); //其他情况,就用partition自带的offset,比如从state中恢复出来的offset
                    }
                }
    
                // from now on, external operations may call the consumer
                this.consumer = consumer;
    
                // the latest bulk of records. may carry across the loop if the thread is woken up
                // from blocking on the handover
                ConsumerRecords<byte[], byte[]> records = null;
    
                // main fetch loop
                while (running) {
    
                    // check if there is something to commit
                    if (!commitInProgress) { //同时只能有一个commit
                        // get and reset the work-to-be committed, so we don't repeatedly commit the same
                        final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); //checkpoint的时候会snapshot fetcher的offset,并通过fetcher.commitInternalOffsetsToKafka设置
    
                        if (toCommit != null) {
    
                            // also record that a commit is already in progress
                            // the order here matters! first set the flag, then send the commit command.
                            commitInProgress = true;
                            consumer.commitAsync(toCommit, offsetCommitCallback); //异步commit offset
                        }
                    }
    
                    // get the next batch of records, unless we did not manage to hand the old batch over
                    if (records == null) {
                        try {
                            records = consumer.poll(pollTimeout); //从kafka读取数据
                        }
                        catch (WakeupException we) {
                            continue;
                        }
                    }
    
                    try {
                        handover.produce(records); //放入handover
                        records = null;
                    }
                    catch (Handover.WakeupException e) {
                        // fall through the loop
                    }
                }
                // end main fetch loop
            }
        }
  • 相关阅读:
    MYSQL
    Oracle建立表空间和用户
    oracle创建表空间
    MySQL数据库远程连接开启方法
    linux mkfs命令参数及用法详解---linux格式化文件系统命令(包括swap分区)
    小峰Hibernate简介与HelloWorld
    数据结构与算法JavaScript描述——链表
    数据结构与算法JavaScript描述——使用队列
    数据结构与算法JavaScript描述——队列
    数据结构与算法JavaScript描述——栈的使用
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6957844.html
Copyright © 2011-2022 走看看