zoukankan      html  css  js  c++  java
  • Flink

     

    先看

    AbstractFetcher
    这个可以理解就是,consumer中具体去kafka读数据的线程,一个fetcher可以同时读多个partitions的数据来看看
    /**
     * Base class for all fetchers, which implement the connections to Kafka brokers and
     * pull records from Kafka partitions.
     * 
     * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
     * as well as around the optional timestamp assignment and watermark generation. 
     * 
     * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
     *            the Flink data streams.
     * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
     */
    public abstract class AbstractFetcher<T, KPH> {
        
        /** The source context to emit records and watermarks to */
        private final SourceContext<T> sourceContext; //用于发送数据的context
    
        /** All partitions (and their state) that this fetcher is subscribed to */
        private final KafkaTopicPartitionState<KPH>[] allPartitions; //用于记录每个topic partition的状态,比如offset
    
    
        // ------------------------------------------------------------------------
        
        protected AbstractFetcher(
                SourceContext<T> sourceContext,
                List<KafkaTopicPartition> assignedPartitions,
                SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                StreamingRuntimeContext runtimeContext) throws Exception
        {
            // create our partition state according to the timestamp/watermark mode 
            this.allPartitions = initializePartitions(
                    assignedPartitions,
                    timestampWatermarkMode,
                    watermarksPeriodic, watermarksPunctuated,
                    runtimeContext.getUserCodeClassLoader());
            
            // if we have periodic watermarks, kick off the interval scheduler
            if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
                        (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
                
                PeriodicWatermarkEmitter periodicEmitter = 
                        new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
                periodicEmitter.start();  //定期的发出waterMark
            }
        }
    
        // ------------------------------------------------------------------------
        //  Core fetcher work methods
        // ------------------------------------------------------------------------
    
        public abstract void runFetchLoop() throws Exception; //核心的函数,需要重载
        
        // ------------------------------------------------------------------------
        //  Kafka version specifics
        // ------------------------------------------------------------------------
        
        /**
         * Creates the Kafka version specific representation of the given
         * topic partition.
         * 
         * @param partition The Flink representation of the Kafka topic partition.
         * @return The specific Kafka representation of the Kafka topic partition.
         */
        public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);//生成KafkaPartitionHandle,这个其实是kafka中对partition的描述
    
        /**
         * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
         * older Kafka versions).
         * 
         * @param offsets The offsets to commit to Kafka.
         * @throws Exception This method forwards exceptions.
         */
        public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; //如果commit相应的kafka offset,比如写到zk
    
        // ------------------------------------------------------------------------
        //  snapshot and restore the state
        // ------------------------------------------------------------------------
    
        /**
         * Takes a snapshot of the partition offsets.
         * 
         * <p>Important: This method mus be called under the checkpoint lock.
         * 
         * @return A map from partition to current offset.
         */
        public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { //产生所有partitions的snapshot,主要是offset
            // this method assumes that the checkpoint lock is held
            assert Thread.holdsLock(checkpointLock);
    
            HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
            for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
                if (partition.isOffsetDefined()) {
                    state.put(partition.getKafkaTopicPartition(), partition.getOffset());
                }
            }
            return state;
        }
    
        /**
         * Restores the partition offsets.
         * 
         * @param snapshotState The offsets for the partitions 
         */
        public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) { //从checkpoint中去恢复offset
            for (KafkaTopicPartitionState<?> partition : allPartitions) {
                Long offset = snapshotState.get(partition.getKafkaTopicPartition());
                if (offset != null) {
                    partition.setOffset(offset);
                }
            }
        }
        
        // ------------------------------------------------------------------------
        //  emitting records
        // ------------------------------------------------------------------------
    
        /**
         * 
         * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
         * That makes the fast path efficient, the extended paths are called as separate methods.
         * 
         * @param record The record to emit
         * @param partitionState The state of the Kafka partition from which the record was fetched
         * @param offset The offset from which the record was fetched
         */
        protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) { //真正的emit record
            if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                // fast path logic, in case there are no watermarks
    
                // emit the record, using the checkpoint lock to guarantee
                // atomicity of record emission and offset state update
                synchronized (checkpointLock) {
                    sourceContext.collect(record); //发出record
                    partitionState.setOffset(offset); //更新local offset
                }
            }
            else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果有需要定期的watermark
                emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
            }
            else {
                emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
            }
        }
    
        /**
         * Record emission, if a timestamp will be attached from an assigner that is
         * also a periodic watermark generator.
         */
        private void emitRecordWithTimestampAndPeriodicWatermark(
                T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
        {
    
            // extract timestamp - this accesses/modifies the per-partition state inside the
            // watermark generator instance, so we need to lock the access on the
            // partition state. concurrent access can happen from the periodic emitter
            final long timestamp;
            //noinspection SynchronizationOnLocalVariableOrMethodParameter
            synchronized (withWatermarksState) {
                timestamp = withWatermarksState.getTimestampForRecord(record); //调用waterMark.extractTimestamp来获取该record的event time
            }
    
            // emit the record with timestamp, using the usual checkpoint lock to guarantee
            // atomicity of record emission and offset state update 
            synchronized (checkpointLock) {
                sourceContext.collectWithTimestamp(record, timestamp); //这个emit接口,会在发送record的情况下,还加上event time
                partitionState.setOffset(offset);
            }
        }
    
     
        
        // ------------------------------------------------------------------------
        
        /**
         * The periodic watermark emitter. In its given interval, it checks all partitions for
         * the current event time watermark, and possibly emits the next watermark.
         */
        private static class PeriodicWatermarkEmitter implements Triggerable {
    
            private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
            
            private final SourceContext<?> emitter;
            
            private final StreamingRuntimeContext triggerContext;
    
            private final long interval;
            
            private long lastWatermarkTimestamp;
            
            //-------------------------------------------------
    
            PeriodicWatermarkEmitter(
                    KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
                    SourceContext<?> emitter,
                    StreamingRuntimeContext runtimeContext)
            {
                this.allPartitions = checkNotNull(allPartitions);
                this.emitter = checkNotNull(emitter);
                this.triggerContext = checkNotNull(runtimeContext);
                this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
                this.lastWatermarkTimestamp = Long.MIN_VALUE;
            }
    
            //-------------------------------------------------
            
            public void start() {
                triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //注册timer
            }
            
            @Override
            public void trigger(long timestamp) throws Exception {
                // sanity check
                assert Thread.holdsLock(emitter.getCheckpointLock());
                
                long minAcrossAll = Long.MAX_VALUE;
                for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
                    
                    // we access the current watermark for the periodic assigners under the state
                    // lock, to prevent concurrent modification to any internal variables
                    final long curr;
                    //noinspection SynchronizationOnLocalVariableOrMethodParameter
                    synchronized (state) {
                        curr = state.getCurrentWatermarkTimestamp(); //获取waterMark
                    }
                    
                    minAcrossAll = Math.min(minAcrossAll, curr);
                }
                
                // emit next watermark, if there is one
                if (minAcrossAll > lastWatermarkTimestamp) {
                    lastWatermarkTimestamp = minAcrossAll;
                    emitter.emitWatermark(new Watermark(minAcrossAll)); //emit waterMark
                }
                
                // schedule the next watermark
                triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //再次注册timer
            }
        }
    }
     
    Kafka08Fetcher

    基于kafka 0.8版本的fetcher,

    public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>

    核心的函数,是重写

    runFetchLoop
    @Override
        public void runFetchLoop() throws Exception {
            // the map from broker to the thread that is connected to that broker
            final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); //cache每个partition node到每个SimpleConsumerThread的对应关系
    
            // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
            final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); //Zookeeper Handler,用于r/w数据到zookeeper
            this.zookeeperOffsetHandler = zookeeperOffsetHandler;
    
            PeriodicOffsetCommitter periodicCommitter = null;
            try {
                // read offsets from ZooKeeper for partitions that did not restore offsets
                {
                    List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
                    for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
                        if (!partition.isOffsetDefined()) {  //遍历每个partition,如果没有定义offset,即offset没有从checkpoint中恢复,加入partitionsWithNoOffset
                            partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
                        }
                    }
                    
                    //这步仅仅对于没有从ckeckpoint中读到offset的partitionsWithNoOffset
                    Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset); //从zk中读出,相应partition的offset
                    for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
                        Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
                        if (offset != null) {
                            partition.setOffset(offset); //为partition设置从zk中读出的offset
                        }
                    }
                }
    
                // start the periodic offset committer thread, if necessary
                if (autoCommitInterval > 0) { //定期触发commit offsets,比如发送到zk,路径,topic_groupid + "/" + partition;
                    periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, 
                            subscribedPartitions(), errorHandler, autoCommitInterval);
                    periodicCommitter.setName("Periodic Kafka partition offset committer");
                    periodicCommitter.setDaemon(true);
                    periodicCommitter.start();
                }
    
                // Main loop polling elements from the unassignedPartitions queue to the threads
                while (running) {
                    
                    // wait for max 5 seconds trying to get partitions to assign
                    // if threads shut down, this poll returns earlier, because the threads inject the
                    // special marker into the queue
                    List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
                            unassignedPartitionsQueue.getBatchBlocking(5000);
                    partitionsToAssign.remove(MARKER); //这边这个marker干嘛用的。。。,防止上面被block?
    
                    if (!partitionsToAssign.isEmpty()) {
                        LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
                        Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders =  //通过broker server找到partitions的leader,返回的结果,map(leader <-> partition list) 
                                findLeaderForPartitions(partitionsToAssign, kafkaConfig);
    
                        // assign the partitions to the leaders (maybe start the threads)
                        for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
                                partitionsWithLeaders.entrySet())
                        {
                            final Node leader = partitionsWithLeader.getKey(); //leader node
                            final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); //这个leader node可以读取的partition列表
                            SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); //找到leader node对应的consumer thread
    
                            if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
                                // start new thread
                                brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); //如果没有相应的consumer thread,创建新的consumer thread
                                brokerToThread.put(leader, brokerThread); //
                            }
                            else {
                                // put elements into queue of thread
                                ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue =  //
                                        brokerThread.getNewPartitionsQueue();
                                
                                for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
                                    if (!newPartitionsQueue.addIfOpen(fp)) {  //
                                        // we were unable to add the partition to the broker's queue
                                        // the broker has closed in the meantime (the thread will shut down)
                                        // create a new thread for connecting to this broker
                                        List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
                                        seedPartitions.add(fp);
                                        brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
                                        brokerToThread.put(leader, brokerThread);
                                        newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
                                    }
                                }
                            }
                        }
                    }
                }
            }
            catch (InterruptedException e) {
               //......
            }
            finally {
               //......
            }
        }

     

    其他一些接口实现,

    // ------------------------------------------------------------------------
        //  Kafka 0.8 specific class instantiation
        // ------------------------------------------------------------------------
    
        @Override
        public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
            return new TopicAndPartition(partition.getTopic(), partition.getPartition()); //对于kafka0.8,KafkaPartitionHandle就是TopicAndPartition
        }
    
        // ------------------------------------------------------------------------
        //  Offset handling
        // ------------------------------------------------------------------------
    
        @Override
        public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
            ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
            if (zkHandler != null) {
                zkHandler.writeOffsets(offsets); //commit offsets是写到zookeeper的
            }
        }
    
        // ------------------------------------------------------------------------
        //  Utilities
        // ------------------------------------------------------------------------
    
        private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
                List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
                Node leader,
                ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
        {
            // each thread needs its own copy of the deserializer, because the deserializer is
            // not necessarily thread safe
            final KeyedDeserializationSchema<T> clonedDeserializer =
                    InstantiationUtil.clone(deserializer, userCodeClassLoader);
    
            // seed thread with list of fetch partitions (otherwise it would shut down immediately again
            SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
                    this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, 
                    clonedDeserializer, invalidOffsetBehavior);
    
            brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
                    taskName, leader.id(), leader.host(), leader.port()));
            brokerThread.setDaemon(true);
            brokerThread.start(); //创建和启动SimpleConsumerThread
    
            LOG.info("Starting thread {}", brokerThread.getName());
            return brokerThread;
        }

     

    下面来看看SimpleConsumerThread

    class SimpleConsumerThread<T> extends Thread

    核心函数run,主要做的是,不停的读取数据的事情,

    // these are the actual configuration values of Kafka + their original default values.
        this.soTimeout = getInt(config, "socket.timeout.ms", 30000); //Kafka的一些配置
        this.minBytes = getInt(config, "fetch.min.bytes", 1);
        this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
        this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
        this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
        this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
       
        // ------------------------------------------------------------------------
        //  main work loop
        // ------------------------------------------------------------------------
        
        @Override
        public void run() {
            try {
                // create the Kafka consumer that we actually use for fetching
                consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); //创建SimpleConsumer
                
                // make sure that all partitions have some offsets to start with
                // those partitions that do not have an offset from a checkpoint need to get
                // their start offset from ZooKeeper
                getMissingOffsetsFromKafka(partitions);  //为没有offset信息的partition,重置offset,从latest或earlist
    
                // Now, the actual work starts :-)
                int offsetOutOfRangeCount = 0; //用于统计实际执行情况,非法offset,或重连的计数
                int reconnects = 0;
                while (running) {
    
                    // ----------------------------------- partitions list maintenance ----------------------------
    
                    // check queue for new partitions to read from:
                    List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); //对于new partitions的处理,主要就是把它们加到partitions当中
                    if (newPartitions != null) {
                        // found some new partitions for this thread's broker
                        
                        // check if the new partitions need an offset lookup
                        getMissingOffsetsFromKafka(newPartitions); // 为新的partition重置offset 
                        
                        // add the new partitions (and check they are not already in there)
                        for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
                            partitions.add(newPartition);
                        }
                    }
    
                    if (partitions.size() == 0) { //如果partitions为空,即没有需要消费的partition
                        if (newPartitionsQueue.close()) { //如果此时newPartitionsQueue为closed,那么就不可能会有新的partitions加入,那么该thread就没有存在的意义,不需要继续run
                            // close succeeded. Closing thread
                            running = false; //关闭线程
                            
                            LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", 
                                    getName());
    
                            // add the wake-up marker into the queue to make the main thread
                            // immediately wake up and termination faster
                            unassignedPartitions.add(MARKER);
    
                            break;
                        } else { //如果newPartitionsQueue没有被关闭,那就等待新的partitions,continue
                            // close failed: fetcher main thread concurrently added new partitions into the queue.
                            // go to top of loop again and get the new partitions
                            continue; 
                        }
                    }
    
                    // ----------------------------------- request / response with kafka ----------------------------
    
                    FetchRequestBuilder frb = new FetchRequestBuilder(); //创建FetchRequestBuilder
                    frb.clientId(clientId);
                    frb.maxWait(maxWait);
                    frb.minBytes(minBytes);
    
                    for (KafkaTopicPartitionState<?> partition : partitions) {
                        frb.addFetch(
                                partition.getKafkaTopicPartition().getTopic(),
                                partition.getKafkaTopicPartition().getPartition(),
                                partition.getOffset() + 1, // request the next record
                                fetchSize);
                    }
                    
                    kafka.api.FetchRequest fetchRequest = frb.build(); //创建FetchRequest,一个request可以同时读多个partition,取决于partition和consumer数量的比例
    
                    FetchResponse fetchResponse;
                    try {
                        fetchResponse = consumer.fetch(fetchRequest); //从kafka读到数据,包含在FetchResponse中,其中包含从多个partition中取到的数据
                    }
                    catch (Throwable cce) {
                        //noinspection ConstantConditions
                        if (cce instanceof ClosedChannelException) { //链接kafka异常
                           
                            // we don't know if the broker is overloaded or unavailable.
                            // retry a few times, then return ALL partitions for new leader lookup
                            if (++reconnects >= reconnectLimit) {  //如果达到重连limit,说明确实无法连接到kafka
                                LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
                                for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
                                    unassignedPartitions.add(fp); //把负责的partitions加入unassignedPartitions,表明这些partition是没人处理的
                                }
                                this.partitions.clear(); //把partitons清空
                                continue; // jump to top of loop: will close thread or subscribe to new partitions //这步会走到上面的partitions.size() == 0的逻辑
                            }
                            try { //如果需要重试,先关闭consumer,然后重新创建consumer,然后continue重试
                                consumer.close();
                            } catch (Throwable t) {
                                LOG.warn("Error while closing consumer connection", t);
                            }
                            // delay & retry
                            Thread.sleep(100);
                            consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
                            continue; // retry
                        } else {
                            throw cce;
                        }
                    }
                    reconnects = 0;
    
                    // ---------------------------------------- error handling ----------------------------
    
                    if (fetchResponse == null) {
                        throw new IOException("Fetch from Kafka failed (request returned null)");
                    }
                    
                    if (fetchResponse.hasError()) { //如果fetchResponse有错误
                        String exception = "";
                        List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
                        
                        // iterate over partitions to get individual error codes
                        Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
                        boolean partitionsRemoved = false;
                        
                        while (partitionsIterator.hasNext()) {
                            final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
                            short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); //取得对于该partition的error code
    
                            if (code == ErrorMapping.OffsetOutOfRangeCode()) { //非法offset,那么需要重新初始化该partition的offset
                                // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
                                // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
                                partitionsToGetOffsetsFor.add(fp);
                            }
                            else if (code == ErrorMapping.NotLeaderForPartitionCode() ||  //如果由于各种不可用,导致无法从该broker上读取到partition的数据
                                    code == ErrorMapping.LeaderNotAvailableCode() ||
                                    code == ErrorMapping.BrokerNotAvailableCode() ||
                                    code == ErrorMapping.UnknownCode())
                            {
                                // the broker we are connected to is not the leader for the partition.
                                LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
                                LOG.debug("Error code = {}", code);
    
                                unassignedPartitions.add(fp); //那么把该partition放回unassignedPartitions,等待重新分配
    
                                partitionsIterator.remove(); // unsubscribe the partition ourselves,从当前的partitions列表中把该partition删除
                                partitionsRemoved = true;
                            }
                            else if (code != ErrorMapping.NoError()) {
                                exception += "
    Exception for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
                                        StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
                            }
                        }
                        if (partitionsToGetOffsetsFor.size() > 0) {
                            // safeguard against an infinite loop.
                            if (offsetOutOfRangeCount++ > 3) { //如果对于partitions,3次重置offset后,offset仍然有非法的,抛异常,防止无限循环
                                throw new RuntimeException("Found invalid offsets more than three times in partitions "
                                        + partitionsToGetOffsetsFor + " Exceptions: " + exception);
                            }
                            // get valid offsets for these partitions and try again.
                            LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
                            getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); //重置这些partitions的offset, 根据配置会reset到earliest或latest
                            
                            LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
                            continue; // jump back to create a new fetch request. The offset has not been touched.
                        }
                        else if (partitionsRemoved) {
                            continue; // create new fetch request
                        }
                        else {
                            // partitions failed on an error
                            throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
                        }
                    } else {
                        // successful fetch, reset offsetOutOfRangeCount.
                        offsetOutOfRangeCount = 0;
                    }
    
                    // ----------------------------------- process fetch response ----------------------------
    
                    int messagesInFetch = 0;
                    int deletedMessages = 0;
                    Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
                    
                    partitionsLoop:
                    while (partitionsIterator.hasNext()) {
                        final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
                        
                        final ByteBufferMessageSet messageSet = fetchResponse.messageSet( //取出fetchResponse关于该partition的数据,封装成ByteBufferMessageSet
                                currentPartition.getTopic(), currentPartition.getPartition());
    
                        for (MessageAndOffset msg : messageSet) { //对于每天message
                            if (running) {
                                messagesInFetch++;
                                final ByteBuffer payload = msg.message().payload(); //读出message内容
                                final long offset = msg.offset();  //读出message offset
                                
                                if (offset <= currentPartition.getOffset()) { //旧数据,ignore
                                    // we have seen this message already
                                    LOG.info("Skipping message with offset " + msg.offset()
                                            + " because we have seen messages until (including) "
                                            + currentPartition.getOffset()
                                            + " from topic/partition " + currentPartition.getTopic() + '/'
                                            + currentPartition.getPartition() + " already");
                                    continue;
                                }
    
                                // If the message value is null, this represents a delete command for the message key.
                                // Log this and pass it on to the client who might want to also receive delete messages.
                                byte[] valueBytes;
                                if (payload == null) {
                                    deletedMessages++;
                                    valueBytes = null;
                                } else {
                                    valueBytes = new byte[payload.remaining()];
                                    payload.get(valueBytes); //将内容,读入valueBytes
                                }
    
                                // put key into byte array
                                byte[] keyBytes = null;
                                int keySize = msg.message().keySize();
    
                                if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
                                    ByteBuffer keyPayload = msg.message().key();
                                    keyBytes = new byte[keySize]; //将key读入keyBytes
                                    keyPayload.get(keyBytes);
                                }
    
                                final T value = deserializer.deserialize(keyBytes, valueBytes, //将message反序列化成对象
                                        currentPartition.getTopic(), currentPartition.getPartition(), offset);
                                
                                if (deserializer.isEndOfStream(value)) {
                                    // remove partition from subscribed partitions.
                                    partitionsIterator.remove();
                                    continue partitionsLoop;
                                }
                                
                                owner.emitRecord(value, currentPartition, offset); //emit 数据
                            }
                            else {
                                // no longer running
                                return;
                            }
                        }
                    }
                } // end of fetch loop
    
            }
        }

     

    最后,看看

    FlinkKafkaConsumerBase
    /**
     * Base class of all Flink Kafka Consumer data sources.
     * This implements the common behavior across all Kafka versions.
     * 
     * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
     * {@link AbstractFetcher}.
     * 
     * @param <T> The type of records produced by this data source
     */
    public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
            CheckpointListener,
            CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
            ResultTypeQueryable<T>

    这个是对所有版本kafka的抽象,

    @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
            
            // figure out which partitions this subtask should process
            final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions, //对应于topic partition数和consumer数,一个consumer应该分配哪些partitions,这里逻辑就是简单的取模
                    getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
            
            // we need only do work, if we actually have partitions assigned
            if (!thisSubtaskPartitions.isEmpty()) {
    
                // (1) create the fetcher that will communicate with the Kafka brokers
                final AbstractFetcher<T, ?> fetcher = createFetcher(  //创建Fetcher
                        sourceContext, thisSubtaskPartitions, 
                        periodicWatermarkAssigner, punctuatedWatermarkAssigner,
                        (StreamingRuntimeContext) getRuntimeContext());
    
                // (2) set the fetcher to the restored checkpoint offsets
                if (restoreToOffset != null) {  //这个如果从checkpoint中读出offset状态
                    fetcher.restoreOffsets(restoreToOffset); //恢复offset
                }
    
                // publish the reference, for snapshot-, commit-, and cancel calls
                // IMPORTANT: We can only do that now, because only now will calls to
                //            the fetchers 'snapshotCurrentState()' method return at least
                //            the restored offsets
                this.kafkaFetcher = fetcher;
                if (!running) {
                    return;
                }
                
                // (3) run the fetcher' main work method
                fetcher.runFetchLoop();  //开始run fetcher
            }
        }
    
        @Override
        public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
    
            HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //snapshot当前的offset
    
            // the map cannot be asynchronously updated, because only one checkpoint call can happen
            // on this function at a time: either snapshotState() or notifyCheckpointComplete()
            pendingCheckpoints.put(checkpointId, currentOffsets); //cache当前的checkpointid,等待该checkpoint完成
            
            // truncate the map, to prevent infinite growth
            while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { //删除过期的,或老的checkpoints
                pendingCheckpoints.remove(0);
            }
    
            return currentOffsets;
        }
    
        @Override
        public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
            LOG.info("Setting restore state in the FlinkKafkaConsumer");
            restoreToOffset = restoredOffsets;
        }
    
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception { //当这个checkpoint完成时,需要通知kafkaconsumer,这个时候才会真正的commit offset
    
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
    
            try {
                final int posInMap = pendingCheckpoints.indexOf(checkpointId);
    
                @SuppressWarnings("unchecked")
                HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
                        (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
    
                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) { //比该checkpoint老的未完成的checkpoint已经没有意义,删除
                    pendingCheckpoints.remove(0);
                }
    
                fetcher.commitSpecificOffsetsToKafka(checkpointOffsets); //真正的commit offset,这个是通用接口,虽然对于kafka0.8,Fetcher里面本身也是会定期提交的,checkpoint一般秒级别比定期提交更频繁些
            }
        }
        
        /**
         * Selects which of the given partitions should be handled by a specific consumer,
         * given a certain number of consumers.
         * 
         * @param allPartitions The partitions to select from
         * @param numConsumers The number of consumers
         * @param consumerIndex The index of the specific consumer
         * 
         * @return The sublist of partitions to be handled by that consumer.
         */
        protected static List<KafkaTopicPartition> assignPartitions(
                List<KafkaTopicPartition> allPartitions,
                int numConsumers, int consumerIndex)
        {
            final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
                    allPartitions.size() / numConsumers + 1);
    
            for (int i = 0; i < allPartitions.size(); i++) {
                if (i % numConsumers == consumerIndex) {
                    thisSubtaskPartitions.add(allPartitions.get(i));
                }
            }
            
            return thisSubtaskPartitions;
        }

     

    针对kafka0.8的consumer

    public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    
        /**
         * Creates a new Kafka streaming source consumer for Kafka 0.8.x
         *
         * This constructor allows passing multiple topics and a key/value deserialization schema.
         * 
         * @param topics
         *           The Kafka topics to read from.
         * @param deserializer
         *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
         * @param props
         *           The properties that are used to configure both the fetcher and the offset handler.
         */
        public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
            super(deserializer);
    
            // validate the zookeeper properties
            validateZooKeeperConfig(props);
    
            this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); //当offset非法的时候,选择从哪里重置,这里支持earlist或latest
            this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); //offset commit的间隔,默认是1分钟
    
            // Connect to a broker to get the partitions for all topics
            List<KafkaTopicPartition> partitionInfos = 
                    KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props)); //这里只是取出topic相关的partition的信息
    
            setSubscribedPartitions(partitionInfos); //将这部分,即该consumer消费的partitions,加入到SubscribedPartitions,表明这些已经有consumer消费了
        }
        
        /**
         * Send request to Kafka to get partitions for topic.
         * 
         * @param topics The name of the topics.
         * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
         */
        public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { //这里的逻辑是如果从kafka取得topic的partititon信息,这里配置配的是broker list,而非zk,所以他每次会随机从所有的brokers中挑一个去读取partitions信息
            String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
            final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
    
            checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
            String[] seedBrokers = seedBrokersConfString.split(",");
            List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
    
            final String clientId = "flink-kafka-consumer-partition-lookup";
            final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
            final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);
    
            Random rnd = new Random();
            retryLoop: for (int retry = 0; retry < numRetries; retry++) {
                // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
                // parallel source instances start. Still, we try all available brokers.
                int index = rnd.nextInt(seedBrokers.length);
                brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
                    String seedBroker = seedBrokers[index];
                    LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
                    if (++index == seedBrokers.length) {
                        index = 0;
                    }
    
                    URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
                    SimpleConsumer consumer = null;
                    try {
                        consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
    
                        TopicMetadataRequest req = new TopicMetadataRequest(topics);
                        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
                        List<TopicMetadata> metaData = resp.topicsMetadata();
    
                        // clear in case we have an incomplete list from previous tries
                        partitions.clear();
                        for (TopicMetadata item : metaData) {
                            if (item.errorCode() != ErrorMapping.NoError()) {
                                // warn and try more brokers
                                LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
                                        "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
                                continue brokersLoop;
                            }
                            if (!topics.contains(item.topic())) {
                                LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
                                continue brokersLoop;
                            }
                            for (PartitionMetadata part : item.partitionsMetadata()) {
                                Node leader = brokerToNode(part.leader());
                                KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
                                KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
                                partitions.add(pInfo);
                            }
                        }
                        break retryLoop; // leave the loop through the brokers
                    } catch (Exception e) {
                        LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
                                "" + e.getClass() + ". Message: " + e.getMessage());
                        LOG.debug("Detailed trace", e);
                        // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e1) {
                            // sleep shorter.
                        }
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                } // brokers loop
            } // retries loop
            return partitions;
        }
    
        private static long getInvalidOffsetBehavior(Properties config) {
            final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
            if (val.equals("none")) {
                throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                        + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
            }
            else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
                return OffsetRequest.LatestTime();
            } else {
                return OffsetRequest.EarliestTime();
            }
        }

     

  • 相关阅读:
    数据类型装换
    变量及数据类型
    27 网络通信协议 udp tcp
    26 socket简单操作
    26 socket简单操作
    14 内置函数 递归 二分法查找
    15 装饰器 开闭原则 代参装饰器 多个装饰器同一函数应用
    12 生成器和生成器函数以及各种推导式
    13 内置函数 匿名函数 eval,exec,compile
    10 函数进阶 动态传参 作用域和名称空间 函数的嵌套 全局变量
  • 原文地址:https://www.cnblogs.com/fxjwind/p/5647990.html
Copyright © 2011-2022 走看看