zoukankan      html  css  js  c++  java
  • Flink源码阅读(二)——checkpoint源码分析

    前言

      在Flink原理——容错机制一文中,已对checkpoint的机制有了较为基础的介绍,本文着重从源码方面去分析checkpoint的过程。当然本文只是分析做checkpoint的调度过程,只是尽量弄清楚整体的逻辑,没有弄清楚其实现细节,还是有遗憾的,后期还是努力去分析实现细节。文中若是有误,欢迎大伙留言指出

      本文基于Flink1.9。

    1、参数设置

      1.1 有关checkpoint常见的参数如下:

    1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2 env.enableCheckpointing(10000);   //默认是不开启的  
    3 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  //默认为EXACTLY_ONCE
    4 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);  //默认为0,最大值为1年
    5 env.getCheckpointConfig().setCheckpointTimeout(150000);  //默认为10min
    6 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  //默认为1

       上述参数的默认值可见flink-streaming-java*.jar中的CheckpointConfig.java,配置值是通过该类中私有configureCheckpointing()的jobGraph.setSnapshotSettings(settings)传递给runtime层的,更多设置也可以参见该类。

      1.2 参数分析

      这里着重分析enableCheckpointing()设置的baseInterval和minPauseBetweenCheckpoint之间的关系。为分析两者的关系,这里先给出源码中定义

    1     /** The base checkpoint interval. Actual trigger time may be affected by the
    2     * max concurrent checkpoints and minimum-pause values */
    3     //checkpoint触发周期,时间触发时间还受maxConcurrentCheckpointAttempts和minPauseBetweenCheckpointsNanos影响
    4     private final long baseInterval;
    5     
    6     /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
    7      * enforce minimum processing time between checkpoint attempts */
    8     //在可以触发checkpoint的时,两次checkpoint之间的时间间隔
    9     private final long minPauseBetweenCheckpointsNanos;

       当baseInterval<minPauseBetweenCheckpoint时,在CheckpointCoordinator.java源码中定义如下:

    1     // it does not make sense to schedule checkpoints more often then the desired
    2     // time between checkpoints
    3     long baseInterval = chkConfig.getCheckpointInterval();
    4     if (baseInterval < minPauseBetweenCheckpoints) {
    5         baseInterval = minPauseBetweenCheckpoints;
    6     }

       从此可以看出,checkpoint的触发虽然设置为周期性的,但是实际触发情况,还得考虑minPauseBetweenCheckpoint和maxConcurrentCheckpointAttempts,若maxConcurrentCheckpointAttempts为1,就算满足触发时间也需等待正在执行的checkpoint结束。

    2、checkpoint调用过程

      将JobGraph提交到Dispatcher后,会createJobManagerRunner和startJobManagerRunner,可以关注Dispatcher类中的createJobManagerRunner(...)方法。

      2.1 createJobManagerRunner阶段

      该阶段会创建一个JobManagerRunner实例,在该过程和checkpoint有关的是会启动listener去监听job的状态。

     1   #JobManagerRunner.java
     2     public JobManagerRunner(...) throws Exception {
     3 
     4         //..........
     5 
     6         // make sure we cleanly shut down out JobManager services if initialization fails
     7         try {
     8             //..........
     9             //加载JobGraph、library、leader选举等
    10 
    11             // now start the JobManager
    12             //启动JobManager
    13             this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
    14         }
    15         catch (Throwable t) {
    16             //......
    17         }
    18     }
    19     
    20     //在DefaultJobMasterServiceFactory类的createJobMasterService()中新建一个JobMaster对象
    21     //#JobMaster.java
    22     public JobMaster(...) throws Exception {
    23 
    24         //........
    25         //该方法中主要做了参数检查,slotPool的创建、slotPool的schedul的创建等一系列的事情
    26         
    27         //创建一个调度器
    28         this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
    29         //......
    30     }

       在创建调度器中核心的语句如下:

     1   //#LegacyScheduler.java中的LegacyScheduler()
     2     //创建ExecutionGraph
     3     this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
     4   
     5 
     6     private ExecutionGraph createAndRestoreExecutionGraph(
     7         JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
     8         ShuffleMaster<?> shuffleMaster,
     9         PartitionTracker partitionTracker) throws Exception {
    10 
    11         
    12         ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
    13 
    14         final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
    15 
    16         if (checkpointCoordinator != null) {
    17             // check whether we find a valid checkpoint
    18             //若state没有被恢复是否可以通过savepoint恢复
    19             //......
    20             }
    21         }
    22 
    23         return newExecutionGraph;
    24     }

       通过调用到达生成ExecutionGraph的核心类ExecutionGraphBuilder的在buildGraph()方法,其中该方法主要是生成ExecutionGraph和设置checkpoint,下面给出其中的核心代码:

     1     //..............
     2     //生成ExecutionGraph的核心方法,这里后期会详细分析
     3     executionGraph.attachJobGraph(sortedTopology);
     4     
     5     //.......................
     6         
     7     //在enableCheckpointing中设置CheckpointCoordinator
     8     executionGraph.enableCheckpointing(
     9         chkConfig,
    10         triggerVertices,
    11         ackVertices,
    12         confirmVertices,
    13         hooks,
    14         checkpointIdCounter,
    15         completedCheckpoints,
    16         rootBackend,
    17         checkpointStatsTracker);    

       在enableCheckpointing()方法中主要是创建了checkpoint失败是的manager、设置了checkpoint的核心类CheckpointCoordinator。

     1     //#ExecutionGraph.java
     2     public void enableCheckpointing(
     3             CheckpointCoordinatorConfiguration chkConfig,
     4             List<ExecutionJobVertex> verticesToTrigger,
     5             List<ExecutionJobVertex> verticesToWaitFor,
     6             List<ExecutionJobVertex> verticesToCommitTo,
     7             List<MasterTriggerRestoreHook<?>> masterHooks,
     8             CheckpointIDCounter checkpointIDCounter,
     9             CompletedCheckpointStore checkpointStore,
    10             StateBackend checkpointStateBackend,
    11             CheckpointStatsTracker statsTracker) {
    12         //Job的状态必须为Created,
    13         checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
    14         checkState(checkpointCoordinator == null, "checkpointing already enabled");
    15         //checkpointing的不同状态
    16         ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
    17         ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
    18         ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
    19 
    20         checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
    21         //checkpoint失败manager,若是checkpoint失败会根据设置来决定下一步
    22         CheckpointFailureManager failureManager = new CheckpointFailureManager(
    23             chkConfig.getTolerableCheckpointFailureNumber(),
    24             new CheckpointFailureManager.FailJobCallback() {
    25                 @Override
    26                 public void failJob(Throwable cause) {
    27                     getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
    28                 }
    29 
    30                 @Override
    31                 public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
    32                     getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask));
    33                 }
    34             }
    35         );
    36 
    37         // create the coordinator that triggers and commits checkpoints and holds the state
    38         //checkpoint的核心类CheckpointCoordinator
    39         checkpointCoordinator = new CheckpointCoordinator(
    40             jobInformation.getJobId(),
    41             chkConfig,
    42             tasksToTrigger,
    43             tasksToWaitFor,
    44             tasksToCommitTo,
    45             checkpointIDCounter,
    46             checkpointStore,
    47             checkpointStateBackend,
    48             ioExecutor,
    49             SharedStateRegistry.DEFAULT_FACTORY,
    50             failureManager);
    51 
    52         // register the master hooks on the checkpoint coordinator
    53         for (MasterTriggerRestoreHook<?> hook : masterHooks) {
    54             if (!checkpointCoordinator.addMasterHook(hook)) {
    55                 LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
    56             }
    57         }
    58         //checkpoint统计
    59         checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    60 
    61         // interval of max long value indicates disable periodic checkpoint,
    62         // the CheckpointActivatorDeactivator should be created only if the interval is not max value
    63         //设置为Long.MAX_VALUE标识关闭周期性的checkpoint
    64         if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
    65             // the periodic checkpoint scheduler is activated and deactivated as a result of
    66             // job status changes (running -> on, all other states -> off)
    67             //只有在job的状态为running时,才会开启checkpoint的scheduler
    68             //createActivatorDeactivator()创建一个listener监听器
    69             //registerJobStatusListener()将listener加入监听器集合jobStatusListeners中
    70             registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    71         }
    72     }
    73     
    74     
    75     //#CheckpointCoordinator.java
    76     / ------------------------------------------------------------------------
    77     //  job status listener that schedules / cancels periodic checkpoints
    78     // ------------------------------------------------------------------------
    79     //创建一个listener监听器checkpointCoordinator.createActivatorDeactivator()
    80     public JobStatusListener createActivatorDeactivator() {
    81         synchronized (lock) {
    82             if (shutdown) {
    83                 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
    84             }
    85 
    86             if (jobStatusListener == null) {
    87                 jobStatusListener = new CheckpointCoordinatorDeActivator(this);
    88             }
    89 
    90             return jobStatusListener;
    91         }
    92     }

       至此,createJobManagerRunner阶段结束了,ExecutionGraph中checkpoint的配置就设置好了。

      2.2 startJobManagerRunner阶段

      在该阶段中,在获得leaderShip之后,就会启动startJobExecution,这里只给出调用涉及的类和方法:

    1     //#JobManagerRunner.java类中
    2     //grantLeadership(...)==>verifyJobSchedulingStatusAndStartJobManager(...)
    3     //==>startJobMaster(...),该方法中核心代码为
    4     startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
    5     
    6     //进一步调用#JobMaster.java类中的start()==>startJobExecution(...)    

       startJobExecution()方法是JobMaster类中的私有方法,具体代码分析如下:

     1   //----------------------------------------------------------------------------------------------
     2     // Internal methods
     3     //----------------------------------------------------------------------------------------------
     4 
     5     //-- job starting and stopping  -----------------------------------------------------------------
     6 
     7     private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
     8 
     9         validateRunsInMainThread();
    10 
    11         checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
    12 
    13         if (Objects.equals(getFencingToken(), newJobMasterId)) {
    14             log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
    15 
    16             return Acknowledge.get();
    17         }
    18 
    19         setNewFencingToken(newJobMasterId);
    20         //启动slotPool并申请资源,该方法可以具体看看申请资源的过程
    21         startJobMasterServices();
    22 
    23         log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
    24         //执行ExecuteGraph的切入口,先判断job的状态是否为created的,后调执行executionGraph.scheduleForExecution();
    25         resetAndStartScheduler();
    26 
    27         return Acknowledge.get();
    28     }

       在LegacyScheduler类中的方法scheduleForExecution()调度过程如下:

     1     public void scheduleForExecution() throws JobException {
     2 
     3         assertRunningInJobMasterMainThread();
     4 
     5         final long currentGlobalModVersion = globalModVersion;
     6         //任务执行之前进行状态切换从CREATED到RUNNING,
     7         //transitionState(...)方法中会通过notifyJobStatusChange(newState, error)通知jobStatusListeners集合中listeners状态改变
     8         if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
     9             //根据启动算子调度模式不同,采用不同的调度方案
    10             final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
    11                 scheduleMode,
    12                 getAllExecutionVertices(),
    13                 this);
    14             
    15             //..............
    16         }
    17         else {
    18             throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
    19         }
    20     }
    21     
    22     private void notifyJobStatusChange(JobStatus newState, Throwable error) {
    23         if (jobStatusListeners.size() > 0) {
    24             final long timestamp = System.currentTimeMillis();
    25             final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
    26 
    27             for (JobStatusListener listener : jobStatusListeners) {
    28                 try {
    29                     listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
    30                 } catch (Throwable t) {
    31                     LOG.warn("Error while notifying JobStatusListener", t);
    32                 }
    33             }
    34         }
    35     }
    36     
    37     
    38     //#CheckpointCoordinatorDeActivator.java
    39     public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    40         if (newJobStatus == JobStatus.RUNNING) {
    41             // start the checkpoint scheduler
    42             //触发checkpoint的核心方法
    43             coordinator.startCheckpointScheduler();
    44         } else {
    45             // anything else should stop the trigger for now
    46             coordinator.stopCheckpointScheduler();
    47         }
    48     }

       下面具体分析触发checkpoint的核心方法startCheckpointScheduler()。

      startCheckpointScheduler()方法结合注释还是比较好理解的,但由于方法太长这里就不全部贴出来了,先分析一下大致做什么了,然后给出其核心代码:

      1)检查触发checkpoint的条件。如coordinator被关闭、周期性checkpoint被禁止、在没有开启强制checkpoint的情况下没有达到最小的checkpoint间隔以及超过并发的checkpoint个数等;

      2)检查是否所有需要checkpoint和需要响应checkpoint的ACK(的task都处于running状态,否则抛出异常;

      3)若均符合,执行checkpointID = checkpointIdCounter.getAndIncrement();以生成一个新的checkpointID,然后生成一个PendingCheckpoint。其中,PendingCheckpoint仅是一个启动了的checkpoint,但是还没有被确认,直到所有的task都确认了本次checkpoint,该checkpoint对象才转化为一个CompletedCheckpoint;

      4)调度timer清理失败的checkpoint;

      5)定义一个超时callback,如果checkpoint执行了很久还没完成,就把它取消;

      6)触发MasterHooks,用户可以定义一些额外的操作,用以增强checkpoint的功能(如准备和清理外部资源);

      核心代码如下:

    1     // send the messages to the tasks that trigger their checkpoint
    2     //遍历ExecutionVertex,是否异步触发checkpoint
    3     for (Execution execution: executions) {
    4         if (props.isSynchronous()) {
    5             execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
    6         } else {
    7             execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    8         }
    9     }

       不管是否以异步的方式触发checkpoint,最终调用的方法是Execution类中的私有方法triggerCheckpointHelper(...),具体代码如下:

     1   //Execution.java
     2     private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
     3 
     4         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
     5         if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
     6             throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
     7         }
     8 
     9         final LogicalSlot slot = assignedResource;
    10 
    11         if (slot != null) {
    12             //TaskManagerGateway是用于与taskManager通信的组件
    13             final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    14 
    15             taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
    16         } else {
    17             LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
    18         }
    19     }

       至此,checkpointCoordinator就将做checkpoint的命令发送到TaskManager去了,下面着重分析TM中checkpoint的执行过程。

      2.3 TaskManager中checkpoint

      TaskManager 接收到触发checkpoint的RPC后,会触发生成checkpoint barrier。RpcTaskManagerGateway作为消息入口,其triggerCheckpoint(...)会调用TaskExecutor的triggerCheckpoint(...),具体过程如下:

     1   //RpcTaskManagerGateway.java
     2     public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
     3         taskExecutorGateway.triggerCheckpoint(
     4             executionAttemptID,
     5             checkpointId,
     6             timestamp,
     7             checkpointOptions,
     8             advanceToEndOfEventTime);
     9     }
    10     
    11     //TaskExecutor.java
    12     @Override
    13     public CompletableFuture<Acknowledge> triggerCheckpoint(
    14             ExecutionAttemptID executionAttemptID,
    15             long checkpointId,
    16             long checkpointTimestamp,
    17             CheckpointOptions checkpointOptions,
    18             boolean advanceToEndOfEventTime) {
    19         log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    20 
    21         //...........
    22 
    23         if (task != null) {
    24             //核心方法,触发生成barrier
    25             task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
    26 
    27             return CompletableFuture.completedFuture(Acknowledge.get());
    28         } else {
    29             final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
    30 
    31             //.........
    32         }
    33     }

       在Task类的triggerCheckpointBarrier(...)方法中生成了一个Runable匿名类用于执行checkpoint,然后以异步的方式触发了该Runable,具体代码如下:

     1     public void triggerCheckpointBarrier(
     2             final long checkpointID,
     3             final long checkpointTimestamp,
     4             final CheckpointOptions checkpointOptions,
     5             final boolean advanceToEndOfEventTime) {
     6 
     7         final AbstractInvokable invokable = this.invokable;
     8         //创建一个CheckpointMetaData,该对象仅有checkpointID、checkpointTimestamp两个属性
     9         final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
    10 
    11         if (executionState == ExecutionState.RUNNING && invokable != null) {
    12 
    13             //..............
    14 
    15             Runnable runnable = new Runnable() {
    16                 @Override
    17                 public void run() {
    18                     // set safety net from the task's context for checkpointing thread
    19                     LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
    20                     FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
    21 
    22                     try {
    23                         //根据SourceStreamTask和StreamTask调用不同的方法
    24                         boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
    25                         if (!success) {
    26                             checkpointResponder.declineCheckpoint(
    27                                     getJobID(), getExecutionId(), checkpointID,
    28                                     new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    29                         }
    30                     }
    31                     catch (Throwable t) {
    32                         if (getExecutionState() == ExecutionState.RUNNING) {
    33                             failExternally(new Exception(
    34                                 "Error while triggering checkpoint " + checkpointID + " for " +
    35                                     taskNameWithSubtask, t));
    36                         } else {
    37                             LOG.debug("Encountered error while triggering checkpoint {} for " +
    38                                 "{} ({}) while being not in state running.", checkpointID,
    39                                 taskNameWithSubtask, executionId, t);
    40                         }
    41                     } finally {
    42                         FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
    43                     }
    44                 }
    45             };
    46             //以异步的方式触发Runnable
    47             executeAsyncCallRunnable(
    48                     runnable,
    49                     String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
    50         }
    51         else {
    52             LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
    53 
    54             // send back a message that we did not do the checkpoint
    55             checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
    56                     new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    57         }
    58     }

       SourceStreamTask和StreamTask调用triggerCheckpoint最终都是调用StreamTask类中的triggerCheckpoint(...)方法,其核心代码为:

    1   //#StreamTask.java
    2     return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);

       在performCheckpoint(...)方法中,主要有以下两件事:

      1、若task是running,则可以进行checkpoint,主要有以下三件事:

        1)为checkpoint做准备,一般是什么不做的,直接接受checkpoint;

        2)生成barrier,并以广播的形式发射到下游去;

        3)触发本task保存state;

      2、若不是running,通知下游取消本次checkpoint,方法是发送一个CancelCheckpointMarker,这是类似于Barrier的另一种消息。

       具体代码如下:

     1   //#StreamTask.java
     2     private boolean performCheckpoint(
     3             CheckpointMetaData checkpointMetaData,
     4             CheckpointOptions checkpointOptions,
     5             CheckpointMetrics checkpointMetrics,
     6             boolean advanceToEndOfTime) throws Exception {
     7         //......
     8 
     9         synchronized (lock) {
    10             if (isRunning) {
    11 
    12                 if (checkpointOptions.getCheckpointType().isSynchronous()) {
    13                     syncSavepointLatch.setCheckpointId(checkpointId);
    14 
    15                     if (advanceToEndOfTime) {
    16                         advanceToEndOfEventTime();
    17                     }
    18                 }
    19 
    20                 // All of the following steps happen as an atomic step from the perspective of barriers and
    21                 // records/watermarks/timers/callbacks.
    22                 // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
    23                 // checkpoint alignments
    24 
    25                 // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
    26                 //           The pre-barrier work should be nothing or minimal in the common case.
    27                 operatorChain.prepareSnapshotPreBarrier(checkpointId);
    28 
    29                 // Step (2): Send the checkpoint barrier downstream
    30                 operatorChain.broadcastCheckpointBarrier(
    31                         checkpointId,
    32                         checkpointMetaData.getTimestamp(),
    33                         checkpointOptions);
    34 
    35                 // Step (3): Take the state snapshot. This should be largely asynchronous, to not
    36                 //           impact progress of the streaming topology
    37                 checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
    38 
    39                 return true;
    40             }
    41             else {
    42                 //.......
    43             }
    44         }
    45     }

        接下来分析checkpointState(...)过程。

      checkpointState(...)方法最终会调用StreamTask类中executeCheckpointing(),其中会创建一个异步对象AsyncCheckpointRunnable,用以报告该检查点已完成,关键代码如下:

     1   //#StreamTask.java类中executeCheckpointing()
     2     public void executeCheckpointing() throws Exception {
     3             startSyncPartNano = System.nanoTime();
     4 
     5             try {
     6                 //调用StreamOperator进行snapshotState的入口方法,依算子不同而变
     7                 for (StreamOperator<?> op : allOperators) {
     8                     checkpointStreamOperator(op);
     9                 }
    10                 //.........
    11 
    12                 // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
    13                 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
    14                     owner,
    15                     operatorSnapshotsInProgress,
    16                     checkpointMetaData,
    17                     checkpointMetrics,
    18                     startAsyncPartNano);
    19 
    20                 owner.cancelables.registerCloseable(asyncCheckpointRunnable);
    21                 owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
    22 
    23                 //.........
    24             } catch (Exception ex) {
    25                 //.......
    26             }
    27         }

       进入AsyncCheckpointRunnable(...)中的run()方法,其中会调用StreamTask类中reportCompletedSnapshotStates(...)(对于一个无状态的job返回的null),进而调用TaskStateManagerImpl类中的reportTaskStateSnapshots(...)将TM的checkpoint汇报给JM,关键代码如下:

    1     //TaskStateManagerImpl.java
    2     checkpointResponder.acknowledgeCheckpoint(
    3             jobId,
    4             executionAttemptID,
    5             checkpointId,
    6             checkpointMetrics,
    7             acknowledgedState);

      其逻辑是逻辑是通过rpc的方式远程调JobManager的相关方法完成报告事件。

      2.4 JobManager处理checkpoint

      通过RpcCheckpointResponder类中acknowledgeCheckpoint(...)来响应checkpoint返回的消息,该方法之后的调度过程和涉及的核心方法如下:

     1    //#JobMaster类中acknowledgeCheckpoint==>
     2     //#LegacyScheduler类中acknowledgeCheckpoint==>
     3     //#CheckpointCoordinator类中receiveAcknowledgeMessage(...)==>
     4     //completePendingCheckpoint(checkpoint);
     5     
     6     //<p>Important: This method should only be called in the checkpoint lock scope
     7     private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
     8         final long checkpointId = pendingCheckpoint.getCheckpointId();
     9         final CompletedCheckpoint completedCheckpoint;
    10 
    11         // As a first step to complete the checkpoint, we register its state with the registry
    12         Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
    13         sharedStateRegistry.registerAll(operatorStates.values());
    14 
    15         try {
    16             try {
    17                 //完成checkpoint
    18                 completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
    19                 failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
    20             }
    21             catch (Exception e1) {
    22                 // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
    23                 if (!pendingCheckpoint.isDiscarded()) {
    24                     failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
    25                 }
    26 
    27                 throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
    28                     CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
    29             }
    30 
    31             // the pending checkpoint must be discarded after the finalization
    32             Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
    33 
    34             try {
    35                 //添加新的checkpoints,若有必要(completedCheckpoints.size() > maxNumberOfCheckpointsToRetain)删除旧的
    36                 completedCheckpointStore.addCheckpoint(completedCheckpoint);
    37             } catch (Exception exception) {
    38                 // we failed to store the completed checkpoint. Let's clean up
    39                 executor.execute(new Runnable() {
    40                     @Override
    41                     public void run() {
    42                         try {
    43                             completedCheckpoint.discardOnFailedStoring();
    44                         } catch (Throwable t) {
    45                             LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
    46                         }
    47                     }
    48                 });
    49 
    50                 throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
    51                     CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
    52             }
    53         } finally {
    54             pendingCheckpoints.remove(checkpointId);
    55 
    56             triggerQueuedRequests();
    57         }
    58 
    59         rememberRecentCheckpointId(checkpointId);
    60 
    61         // drop those pending checkpoints that are at prior to the completed one
    62         //删除在其之前未完成的checkpoint(优先级高的)
    63         dropSubsumedCheckpoints(checkpointId);
    64 
    65         // record the time when this was completed, to calculate
    66         // the 'min delay between checkpoints'
    67         lastCheckpointCompletionNanos = System.nanoTime();
    68 
    69         LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
    70             completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
    71 
    72         if (LOG.isDebugEnabled()) {
    73             StringBuilder builder = new StringBuilder();
    74             builder.append("Checkpoint state: ");
    75             for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
    76                 builder.append(state);
    77                 builder.append(", ");
    78             }
    79             // Remove last two chars ", "
    80             builder.setLength(builder.length() - 2);
    81 
    82             LOG.debug(builder.toString());
    83         }
    84 
    85         // send the "notify complete" call to all vertices
    86         final long timestamp = completedCheckpoint.getTimestamp();
    87         
    88         //通知所有(TM中)operator该checkpoint已完成
    89         for (ExecutionVertex ev : tasksToCommitTo) {
    90             Execution ee = ev.getCurrentExecutionAttempt();
    91             if (ee != null) {
    92                 ee.notifyCheckpointComplete(checkpointId, timestamp);
    93             }
    94         }
    95     }

       至此,checkpoint的整体流程分析完毕建议结合原理去理解,参考的三篇文献都是写的很好的,有时间建议看看。

    Ref:

    [1]https://www.jianshu.com/p/a40a1b92f6a2

    [2]https://www.cnblogs.com/bethunebtj/p/9168274.html

    [3] https://blog.csdn.net/qq475781638/article/details/92698301

  • 相关阅读:
    scroll事件实现监控滚动条改变标题栏背景透明度(zepto.js )
    瀑布流-转载
    如何清除img图片下面有一片空白
    加减号改变input[type=number]的数值,基于[zepto.js]
    如何移除HTML5的type=""number""的input标签的上下箭头
    滑屏框架
    GO_06:GO语言基础之struct
    GO_05:GO语言基础map与函数
    GO_04:GO语言基础条件、跳转、Array和Slice
    GO_03:GO语言基础语法
  • 原文地址:https://www.cnblogs.com/love-yh/p/11695839.html
Copyright © 2011-2022 走看看