zoukankan      html  css  js  c++  java
  • Flink 中的kafka何时commit?

    https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html

    @Override
    publicfinalvoidnotifyCheckpointComplete(longcheckpointId)throwsException{
    if(!running){
    LOG.debug("notifyCheckpointComplete()calledonclosedsource");
    return;
    }

    finalAbstractFetcher<?,?>fetcher=this.kafkaFetcher;
    if(fetcher==null){
    LOG.debug("notifyCheckpointComplete()calledonuninitializedsource");
    return;
    }

    if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
    //onlyonecommitoperationmustbeinprogress
    if(LOG.isDebugEnabled()){
    LOG.debug("CommittingoffsetstoKafka/ZooKeeperforcheckpoint"+checkpointId);
    }

    try{
    finalintposInMap=pendingOffsetsToCommit.indexOf(checkpointId);
    if(posInMap==-1){
    LOG.warn("Receivedconfirmationforunknowncheckpointid{}",checkpointId);
    return;
    }

    @SuppressWarnings("unchecked")
    Map<KafkaTopicPartition,Long>offsets=
    (Map<KafkaTopicPartition,Long>)pendingOffsetsToCommit.remove(posInMap);

    //removeoldercheckpointsinmap
    for(inti=0;i<posInMap;i++){
    pendingOffsetsToCommit.remove(0);
    }

    if(offsets==null||offsets.size()==0){
    LOG.debug("Checkpointstatewasempty.");
    return;
    }

    fetcher.commitInternalOffsetsToKafka(offsets,offsetCommitCallback);
    }catch(Exceptione){
    if(running){
    throwe;
    }
    //elseignoreexceptionifwearenolongerrunning
    }
    }
    }

    /**
    *Theoffsetcommitmoderepresentsthebehaviourofhowoffsetsareexternallycommitted
    *backtoKafkabrokers/Zookeeper.
    *
    *<p>Theexactvalueofthisisdeterminedatruntimeintheconsumersubtasks.
    */
    @Internal
    publicenumOffsetCommitMode{

    /**Completelydisableoffsetcommitting.*/
    DISABLED,

    /**CommitoffsetsbacktoKafkaonlywhencheckpointsarecompleted.*/
    ON_CHECKPOINTS,

    /**CommitoffsetsperiodicallybacktoKafka,usingtheautocommitfunctionalityofinternalKafkaclients.*/
    KAFKA_PERIODIC;
    }

    /**
    *CommitsthegivenpartitionoffsetstotheKafkabrokers(ortoZooKeeperfor
    *olderKafkaversions).Thismethodisonlyevercalledwhentheoffsetcommitmodeof
    *theconsumeris{@linkOffsetCommitMode#ON_CHECKPOINTS}.
    *
    *<p>Thegivenoffsetsaretheinternalcheckpointedoffsets,representing
    *thelastprocessedrecordofeachpartition.Version-specificimplementationsofthismethod
    *needtoholdthecontractthatthegivenoffsetsmustbeincrementedby1before
    *committingthem,sothatcommittedoffsetstoKafkarepresent"thenextrecordtoprocess".
    *
    *@paramoffsetsTheoffsetstocommittoKafka(implementationsmustincrementoffsetsby1beforecommitting).
    *@paramcommitCallbackThecallbackthattheusershouldtriggerwhenacommitrequestcompletesorfails.
    *@throwsExceptionThismethodforwardsexceptions.
    */
    publicfinalvoidcommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition,Long>offsets,
    @NonnullKafkaCommitCallbackcommitCallback)throwsException{
    //Ignoresentinels.Theymightappearhereifsnapshothasstartedbeforeactualoffsetsvalues
    //replacedsentinels
    doCommitInternalOffsetsToKafka(filterOutSentinels(offsets),commitCallback);
    }

    /**
    * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
    * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
    * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
    * A request is considered completed when it is successfully acknowledged
    * according to the <code>acks</code> configuration you have specified or else it results in an error.
    * <p>
    * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
    * however no guarantee is made about the completion of records sent after the flush call begins.
    * <p>
    * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
    * gives a convenient way to ensure all previously sent messages have actually completed.
    * <p>
    * This example shows how to consume from one Kafka topic and produce to another Kafka topic:
    * <pre>
    * {@code
    * for(ConsumerRecord<String, String> record: consumer.poll(100))
    * producer.send(new ProducerRecord("my-topic", record.key(), record.value());
    * producer.flush();
    * consumer.commit();
    * }
    * </pre>
    *
    * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
    * we need to set <code>retries=&lt;large_number&gt;</code> in our config.
    * </p>
    * <p>
    * Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
    * flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)}
    * calls made since the previous {@link #beginTransaction()} are completed before the commit.
    * </p>
    *
    * @throws InterruptException If the thread is interrupted while blocked
    */
    @Override
    public void flush() {
    log.trace("Flushing accumulated records in producer.");
    this.accumulator.beginFlush();
    this.sender.wakeup();
    try {
    this.accumulator.awaitFlushCompletion();
    } catch (InterruptedException e) {
    throw new InterruptException("Flush interrupted.", e);
    }
    }

  • 相关阅读:
    OSCP Learning Notes Buffer Overflows(3)
    OSCP Learning Notes Buffer Overflows(5)
    OSCP Learning Notes Exploit(3)
    OSCP Learning Notes Exploit(4)
    OSCP Learning Notes Exploit(1)
    OSCP Learning Notes Netcat
    OSCP Learning Notes Buffer Overflows(4)
    OSCP Learning Notes Buffer Overflows(1)
    OSCP Learning Notes Exploit(2)
    C++格式化输出 Learner
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9713582.html
Copyright © 2011-2022 走看看