zoukankan      html  css  js  c++  java
  • Flink源码阅读(一)--Checkpoint触发机制

    Checkpoint触发机制

      Flink的checkpoint是通过定时器周期性触发的。checkpoint触发最关键的类是CheckpointCoordinator,称它为检查点协调器。

    org.apache.flink.runtime.checkpoint.CheckpointCoordinator

      CheckpointCoordinator主要作用是协调operators和state的分布式快照。它通过向相关的tasks发送触发消息和从各tasks收集确认消息(Ack)来完成checkpoint。同时,它还收集和维护各个tasks上报的状态句柄/状态引用(state handles)。

      CheckpointCoordinator主要属性:

     1     /** Coordinator-wide lock to safeguard the checkpoint updates */
     2     private final Object lock = new Object();  //Coordinator范围的锁
     3 
     4     /** Lock specially to make sure that trigger requests do not overtake each other.
     5      * This is not done with the coordinator-wide lock, because as part of triggering,
     6      * blocking operations may happen (distributed atomic counters).
     7      * Using a dedicated lock, we avoid blocking the processing of 'acknowledge/decline'
     8      * messages during that phase. */
     9     private final Object triggerLock = new Object(); //trigger requests的专用锁,避免在获取checkpointID时阻塞对消息的处理。
    10 
    11     /** Tasks who need to be sent a message when a checkpoint is started */
    12     private final ExecutionVertex[] tasksToTrigger;
    13 
    14     /** Tasks who need to acknowledge a checkpoint before it succeeds */
    15     private final ExecutionVertex[] tasksToWaitFor;
    16 
    17     /** Tasks who need to be sent a message when a checkpoint is confirmed */
    18     private final ExecutionVertex[] tasksToCommitTo;
    19 
    20     /** Map from checkpoint ID to the pending checkpoint */
    21     private final Map<Long, PendingCheckpoint> pendingCheckpoints;//待处理的checkpoint
    22 
    23     /** Actor that receives status updates from the execution graph this coordinator works for */
    24     private JobStatusListener jobStatusListener;//Actor实例,监听Job状态变化并根据变化启停定时任务
    25 
    26     /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
    27      * accessed in synchronized scope */
    28     private boolean triggerRequestQueued;//标记一个触发请求是否不能被立即处理。
    29 
    30     /** Flag marking the coordinator as shut down (not accepting any messages any more) */
    31     private volatile boolean shutdown;//coordinator的关闭标志

      ScheduledTrigger

      ScheduledTrigger是检查点定时任务类,主要是调用了triggerCheckpoint方法。

     1     private final class ScheduledTrigger implements Runnable {
     2         @Override
     3         public void run() {
     4             try {
     5                 triggerCheckpoint(System.currentTimeMillis(), true);
     6             }
     7             catch (Exception e) {
     8                 LOG.error("Exception while triggering checkpoint.", e);
     9             }
    10         }
    11     }

      下面具体看一下 triggerCheckpoint 方法的具体实现

    1     //触发一个新的标准检查点。timestamp为触发检查点的时间戳,isPeriodic标志是否是周期性的触发
    2     public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
    3         return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
    4     }

      触发检查点的核心逻辑:

        首先进行触发Checkpoint之前的预检查,判断是否满足条件;

        然后获取一个CheckpointID,创建PendingCheckpoint实例;

        之后重新检查触发条件是否满足要求,防止产生竞态条件;

        最后将PendingCheckpoint实例checkpoint加入到pendingCheckpoints中,并向tasks发送消息触发它们的检查点。

      1     CheckpointTriggerResult triggerCheckpoint(
      2             long timestamp,
      3             CheckpointProperties props,
      4             String targetDirectory,
      5             boolean isPeriodic) {
      6 
      7         // Sanity check 如果检查点是存储在外部系统中且targetDirectory为空,报错
      8         if (props.externalizeCheckpoint() && targetDirectory == null) {
      9             throw new IllegalStateException("No target directory specified to persist checkpoint to.");
     10         }
     11 
     12         // make some eager pre-checks 一些checkpoint之前的预检查
     13         synchronized (lock) {
     14             // abort if the coordinator has been shutdown in the meantime
     15             if (shutdown) {
     16                 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
     17             }
     18 
     19             // Don't allow periodic checkpoint if scheduling has been disabled
     20             if (isPeriodic && !periodicScheduling) {
     21                 return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
     22             }
     23 
     24             // validate whether the checkpoint can be triggered, with respect to the limit of
     25             // concurrent checkpoints, and the minimum time between checkpoints.
     26             // these checks are not relevant for savepoints
     27             // 验证checkpoint是否能被触发,关于并发检查点的限制和检查点之间的最小时间。
     28             // 判断checkpoint是否被强制。强制checkpoint不受并发检查点最大数量和检查点之间最小时间的限制。
     29             if (!props.forceCheckpoint()) {
     30                 // sanity check: there should never be more than one trigger request queued
     31                 if (triggerRequestQueued) {
     32                     //如果不能被立即触发,直接返回异常
     33                     LOG.warn("Trying to trigger another checkpoint while one was queued already");
     34                     return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
     35                 }
     36 
     37                 // if too many checkpoints are currently in progress, we need to mark that a request is queued
     38                 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
     39                     //如果未完成的检查点太多,大于配置的并发检查点最大数量,则将当前检查点的触发请求设置为不能立即执行。
     40                     triggerRequestQueued = true;
     41                     //如果定时任务已经启动,则取消定时任务的执行。
     42                     if (currentPeriodicTrigger != null) {
     43                         currentPeriodicTrigger.cancel(false);
     44                         currentPeriodicTrigger = null;
     45                     }
     46                     return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
     47                 }
     48 
     49                 // make sure the minimum interval between checkpoints has passed
     50                 //检查是否满足checkpoint之间的最小时间间隔的条件
     51                 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
     52                 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
     53 
     54                 if (durationTillNextMillis > 0) {
     55                     if (currentPeriodicTrigger != null) {
     56                         currentPeriodicTrigger.cancel(false);
     57                         currentPeriodicTrigger = null;
     58                     }
     59                     // Reassign the new trigger to the currentPeriodicTrigger
     60                     //此时延迟时间设置为durationTillNextMillis
     61                     currentPeriodicTrigger = timer.scheduleAtFixedRate(
     62                             new ScheduledTrigger(),
     63                             durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
     64 
     65                     return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
     66                 }
     67             }
     68         }
     69 
     70         // check if all tasks that we need to trigger are running.
     71         // if not, abort the checkpoint
     72         // 检查需要触发checkpoint的所有Tasks是否处于运行状态,如果有一个不满足条件,则不触发检查点
     73         Execution[] executions = new Execution[tasksToTrigger.length];
     74         for (int i = 0; i < tasksToTrigger.length; i++) {
     75             Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
     76             if (ee != null && ee.getState() == ExecutionState.RUNNING) {
     77                 executions[i] = ee;
     78             } else {
     79                 LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
     80                         tasksToTrigger[i].getTaskNameWithSubtaskIndex());
     81                 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
     82             }
     83         }
     84 
     85         // next, check if all tasks that need to acknowledge the checkpoint are running.
     86         // if not, abort the checkpoint
     87         //检查所有需要ack的tasks是否都处于运行状态,如果有一个不满足条件,则不触发检查点。
     88         Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
     89 
     90         for (ExecutionVertex ev : tasksToWaitFor) {
     91             Execution ee = ev.getCurrentExecutionAttempt();
     92             if (ee != null) {
     93                 ackTasks.put(ee.getAttemptId(), ev);
     94             } else {
     95                 LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
     96                         ev.getTaskNameWithSubtaskIndex());
     97                 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
     98             }
     99         }
    100 
    101         // we will actually trigger this checkpoint!
    102 
    103         // we lock with a special lock to make sure that trigger requests do not overtake each other.
    104         // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
    105         // may issue blocking operations. Using a different lock than the coordinator-wide lock,
    106         // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
    107         // 触发检查点,在triggerLock同步代码块中完成,而不是使用coordinator范围的锁。
    108         synchronized (triggerLock) {
    109             final long checkpointID;
    110             //首先获取checkpointID
    111             try {
    112                 // this must happen outside the coordinator-wide lock, because it communicates
    113                 // with external services (in HA mode) and may block for a while.
    114                 checkpointID = checkpointIdCounter.getAndIncrement();
    115             }
    116             catch (Throwable t) {
    117                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
    118                 LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
    119                 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
    120             }
    121 
    122             //创建PendingCheckpoint实例,表示待处理检查点
    123             final PendingCheckpoint checkpoint = new PendingCheckpoint(
    124                 job,
    125                 checkpointID,
    126                 timestamp,
    127                 ackTasks,
    128                 props,
    129                 targetDirectory,
    130                 executor);
    131 
    132             if (statsTracker != null) {
    133                 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
    134                     checkpointID,
    135                     timestamp,
    136                     props);
    137 
    138                 checkpoint.setStatsCallback(callback);
    139             }
    140 
    141             // schedule the timer that will clean up the expired checkpoints
    142             // 针对当前checkpoints超时进行资源清理的canceller
    143             final Runnable canceller = new Runnable() {
    144                 @Override
    145                 public void run() {
    146                     synchronized (lock) {
    147                         // only do the work if the checkpoint is not discarded anyways
    148                         // note that checkpoint completion discards the pending checkpoint object
    149                         if (!checkpoint.isDiscarded()) {
    150                             LOG.info("Checkpoint " + checkpointID + " expired before completing.");
    151 
    152                             checkpoint.abortExpired();
    153                             pendingCheckpoints.remove(checkpointID);
    154                             rememberRecentCheckpointId(checkpointID);
    155 
    156                             triggerQueuedRequests();
    157                         }
    158                     }
    159                 }
    160             };
    161 
    162             try {
    163                 //重新请求coordinator-wide lock
    164                 // re-acquire the coordinator-wide lock
    165                 synchronized (lock) {
    166                     // since we released the lock in the meantime, we need to re-check
    167                     // that the conditions still hold.
    168                     // 重新检查触发条件,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。
    169                     if (shutdown) {
    170                         return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
    171                     }
    172                     else if (!props.forceCheckpoint()) {
    173                         if (triggerRequestQueued) {
    174                             LOG.warn("Trying to trigger another checkpoint while one was queued already");
    175                             return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
    176                         }
    177 
    178                         if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
    179                             triggerRequestQueued = true;
    180                             if (currentPeriodicTrigger != null) {
    181                                 currentPeriodicTrigger.cancel(false);
    182                                 currentPeriodicTrigger = null;
    183                             }
    184                             return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
    185                         }
    186 
    187                         // make sure the minimum interval between checkpoints has passed
    188                         final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
    189                         final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
    190 
    191                         if (durationTillNextMillis > 0) {
    192                             if (currentPeriodicTrigger != null) {
    193                                 currentPeriodicTrigger.cancel(false);
    194                                 currentPeriodicTrigger = null;
    195                             }
    196 
    197                             // Reassign the new trigger to the currentPeriodicTrigger
    198                             currentPeriodicTrigger = timer.scheduleAtFixedRate(
    199                                     new ScheduledTrigger(),
    200                                     durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
    201 
    202                             return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
    203                         }
    204                     }
    205 
    206                     LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
    207 
    208                     //将checkpoint加入到pendingCheckpoints中
    209                     pendingCheckpoints.put(checkpointID, checkpoint);
    210 
    211                     //启动超时canceller,延迟checkpointTimeout执行
    212                     ScheduledFuture<?> cancellerHandle = timer.schedule(
    213                             canceller,
    214                             checkpointTimeout, TimeUnit.MILLISECONDS);
    215 
    216                     if (!checkpoint.setCancellerHandle(cancellerHandle)) {
    217                         // checkpoint is already disposed!
    218                         cancellerHandle.cancel(false);
    219                     }
    220 
    221                     // trigger the master hooks for the checkpoint
    222                     final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
    223                             checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
    224                     for (MasterState s : masterStates) {
    225                         checkpoint.addMasterState(s);
    226                     }
    227                 }
    228                 // end of lock scope
    229 
    230                 CheckpointOptions checkpointOptions;
    231                 if (!props.isSavepoint()) {
    232                     checkpointOptions = CheckpointOptions.forFullCheckpoint();
    233                 } else {
    234                     checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
    235                 }
    236 
    237                 // send the messages to the tasks that trigger their checkpoint
    238                 // 向tasks发送消息,触发它们的检查点
    239                 for (Execution execution: executions) {
    240                     execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    241                 }
    242 
    243                 numUnsuccessfulCheckpointsTriggers.set(0);
    244                 return new CheckpointTriggerResult(checkpoint);
    245             }
    246             catch (Throwable t) {
    247                 // guard the map against concurrent modifications
    248                 synchronized (lock) {
    249                     pendingCheckpoints.remove(checkpointID);
    250                 }
    251 
    252                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
    253                 LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
    254                         checkpointID, numUnsuccessful, t);
    255 
    256                 if (!checkpoint.isDiscarded()) {
    257                     checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
    258                 }
    259                 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
    260             }
    261 
    262         } // end trigger lock
    263     }

      启动定时任务方法:startCheckpointScheduler

     1     public void startCheckpointScheduler() {
     2         synchronized (lock) {
     3             if (shutdown) {
     4                 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
     5             }
     6             //保证所有以前的timer被取消
     7             stopCheckpointScheduler();
     8 
     9             periodicScheduling = true;
    10             //scheduleAtFixedRate方法是以固定延迟和固定时间间隔周期性的执行任务
    11             currentPeriodicTrigger = timer.scheduleAtFixedRate(
    12                     new ScheduledTrigger(), 
    13                     baseInterval, baseInterval, TimeUnit.MILLISECONDS);
    14         }
    15     }

      停止定时任务方法:stopCheckpointScheduler

     1     //重置一些标记变量,释放资源
     2     public void stopCheckpointScheduler() {
     3         synchronized (lock) {
     4             triggerRequestQueued = false;
     5             periodicScheduling = false;
     6 
     7             if (currentPeriodicTrigger != null) {
     8                 currentPeriodicTrigger.cancel(false);//取消当前周期的触发任务
     9                 currentPeriodicTrigger = null;
    10             }
    11 
    12             //pendingCheckpoints中存的是待执行的检查点
    13             for (PendingCheckpoint p : pendingCheckpoints.values()) {
    14                 p.abortError(new Exception("Checkpoint Coordinator is suspending."));
    15             }
    16             pendingCheckpoints.clear();//清空pendingCheckpoints
    17             numUnsuccessfulCheckpointsTriggers.set(0);
    18         }
    19     }

     

    基于Actor的消息驱动的协同机制

      启动和停止定时任务的机制是怎样的?Flink使用的是基于AKKA的Actor模型的消息驱动机制。

      CheckpointCoordinatorDeActivator类

    org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator

      CheckpointCoordinatorDeActivator是actor的实现类,监听JobStatus的变化,启动和停止周期性的checkpoint调度任务。

     1     //actor的实现类,监听JobStatus的变化,激活和取消周期性的checkpoint调度任务。
     2     public class CheckpointCoordinatorDeActivator implements JobStatusListener {
     3 
     4         private final CheckpointCoordinator coordinator;
     5 
     6         public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
     7             this.coordinator = checkNotNull(coordinator);
     8         }
     9 
    10         @Override
    11         public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    12             if (newJobStatus == JobStatus.RUNNING) {
    13                 // start the checkpoint scheduler
    14                 // 一旦监听到JobStatus变为RUNNING,就会启动定时任务
    15                 coordinator.startCheckpointScheduler();
    16             } else {
    17                 // anything else should stop the trigger for now
    18                 coordinator.stopCheckpointScheduler();
    19             }
    20         }
    21     }

      CheckpointCoordinatorDeActivator的实例是在CheckpointCoordinator中被创建的,方法为createActivatorDeactivator

     1     public JobStatusListener createActivatorDeactivator() {
     2         synchronized (lock) {
     3             if (shutdown) {
     4                 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
     5             }
     6 
     7             if (jobStatusListener == null) {
     8                 jobStatusListener = new CheckpointCoordinatorDeActivator(this);
     9             }
    10 
    11             return jobStatusListener;
    12         }
    13     }

      checkpoint相关Akka消息

      AbstractCheckpointMessage :所有checkpoint消息的基础抽象类

    org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
    

      AbstractCheckpointMessage主要属性:

    1     /** The job to which this message belongs */
    2     private final JobID job;
    3     /** The task execution that is source/target of the checkpoint message */  
    4     private final ExecutionAttemptID taskExecutionId; //检查点的source/target task
    5     /** The ID of the checkpoint that this message coordinates */
    6     private final long checkpointId;

      它有以下实现类:

        TriggerCheckpoint :JobManager向TaskManager发送的检查点触发消息;

        AcknowledgeCheckpoint :TaskManager向JobManager发送的某个独立task的检查点完成确认的消息;

        DeclineCheckpoint :TaskManager向JobManager发送的检查点还没有被处理的消息;

        NotifyCheckpointComplete :JobManager向TaskManager发送的检查点完成的消息。

      TriggerCheckpoint消息

        从JobManager发送到TaskManager,通知指定的task触发checkpoint。

        发送消息

          发送消息的逻辑是在CheckpointCoordinator中,上文提到过:

    1         // send the messages to the tasks that trigger their checkpoint
    2         for (Execution execution: executions) {
    3             execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    4         }

          其中executions是Execution[]数组,其中存储的元素是在检查点触发时需要被发送消息的Tasks的集合(即CheckpointCoordinator成员变量tasksToTrigger中的数据)。对每一个要发送的Task执行triggerCheckpoint()方法。

          接下来,看一下Execution的triggerCheckpoint方法。

     1         //在该execution的task上触发一个新的checkpoint
     2         public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
     3             //获取Resource
     4             final SimpleSlot slot = assignedResource;//获取Slot
     5 
     6             if (slot != null) {
     7                 //TaskManagerGateway是用于和TaskManager通信的抽象基础类
     8                 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
     9                 //通过taskManagerGateway向TaskManager发送消息
    10                 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
    11             } else {
    12                 LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
    13                     "no longer running.");
    14             }
    15         }

          继续进入ActorTaskManagerGateway(TaskManagerGateway抽象类的Actor实现)类的triggerCheckpoint()方法:

     1         public void triggerCheckpoint(
     2                 ExecutionAttemptID executionAttemptID,
     3                 JobID jobId,
     4                 long checkpointId,
     5                 long timestamp,
     6                 CheckpointOptions checkpointOptions) {
     7 
     8             Preconditions.checkNotNull(executionAttemptID);
     9             Preconditions.checkNotNull(jobId);
    10             //新建了一个TriggerCheckpoint消息,通过actorGateway的tell方法(异步发送,没有返回结果)发送这个消息
    11             //ActorGateway是基于actor通信的接口
    12             actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
    13         }

          AkkaActorGateway类是ActorGateway接口一种实现,它使用 Akka 与远程的actors进行通信。看一下AkkaActorGateway的tell方法:

    1         @Override
    2         public void tell(Object message) {
    3             Object newMessage = decorator.decorate(message);
    4             //通过ActorRef实例actor发送消息,ActorRef是akka中的类。以后需要研究Akka的实现机制。
    5             actor.tell(newMessage, ActorRef.noSender());
    6         }

          至此,发送TriggerCheckpoint消息的过程结束。下面将看一下TaskManager接收消息的过程。

        接收消息

          TaskManager接收消息的部分是用scala实现的。

    org.apache.flink.runtime.taskmanager.TaskManager
    

          TaskManager类的handleMessage方法是消息处理中心。

     1         //该方法为TaskManager的消息处理中心。接收消息,按消息的种类调用不同的方法处理。
     2         override def handleMessage: Receive = {
     3             case message: TaskMessage => handleTaskMessage(message)
     4 
     5             //这个就是处理checkpoints相关的消息
     6             case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
     7 
     8             case JobManagerLeaderAddress(address, newLeaderSessionID) =>
     9               handleJobManagerLeaderAddress(address, newLeaderSessionID)
    10 
    11             case message: RegistrationMessage => handleRegistrationMessage(message)
    12 
    13             ...
    14         }

          接下来,看方法handleCheckpointingMessage(),主要是触发Checkpoint Barrier。

     1         //处理Checkpoint相关的消息
     2         private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
     3 
     4             actorMessage match {
     5               //触发Checkpoint消息
     6               case message: TriggerCheckpoint =>
     7                 val taskExecutionId = message.getTaskExecutionId
     8                 val checkpointId = message.getCheckpointId
     9                 val timestamp = message.getTimestamp
    10                 val checkpointOptions = message.getCheckpointOptions
    11 
    12                 log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
    13 
    14                 val task = runningTasks.get(taskExecutionId)
    15                 if (task != null) {
    16                   //调用Task的triggerCheckpointBarrier方法,触发Checkpoint Barrier,Barrier实现机制的细节以后讨论。
    17                   task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
    18                 } else {
    19                   log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
    20                 }
    21               //Checkpoint完成通知消息
    22               case message: NotifyCheckpointComplete =>
    23                 val taskExecutionId = message.getTaskExecutionId
    24                 val checkpointId = message.getCheckpointId
    25                 val timestamp = message.getTimestamp
    26 
    27                 log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
    28 
    29                 val task = runningTasks.get(taskExecutionId)
    30                 if (task != null) {
    31                   //调用Task的notifyCheckpointComplete方法,进行相关处理
    32                   task.notifyCheckpointComplete(checkpointId)
    33                 } else {
    34                   log.debug(
    35                     s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
    36                 }
    37 
    38               // unknown checkpoint message
    39               case _ => unhandled(actorMessage)
    40             }
    41           }

      NotifyCheckpointComplete消息

        JobManager发送到TaskManager,通知task它的检查点已经得到完成确认,task可以向第三方提交checkpoint。

        发送消息

          发送NotifyCheckpointComplete消息的部分在CheckpointCoordinator类的receiveAcknowledgeMessage方法中。

     1         //该方法接收一个AcknowledgeCheckpoint消息,返回该Message是否与一个pending checkpoint相关联
     2         public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
     3             if (shutdown || message == null) {
     4                 return false;
     5             }
     6             if (!job.equals(message.getJob())) {
     7                 LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
     8                 return false;
     9             }
    10 
    11             final long checkpointId = message.getCheckpointId();
    12             
    13             synchronized (lock) {
    14                 // we need to check inside the lock for being shutdown as well, otherwise we
    15                 // get races and invalid error log messages
    16                 if (shutdown) {
    17                     return false;
    18                 }
    19 
    20                 final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
    21                 
    22                 //如果是待处理的检查点并且没有被Discarded
    23                 if (checkpoint != null && !checkpoint.isDiscarded()) {
    24                     
    25                     //根据TaskExecutionId和SubtaskState,Acknowledges the task。确认该任务
    26                     switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
    27                         //确认成功
    28                         case SUCCESS:
    29                             LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
    30                                 checkpointId, message.getTaskExecutionId(), message.getJob());
    31                             //如果收到了全部task的确认消息(即notYetAcknowledgedTasks为空)
    32                             if (checkpoint.isFullyAcknowledged()) {
    33                                 //尝试完成PendingCheckpoint(Try to complete the given pending checkpoint)
    34                                 //将完成的checkpointId从checkpoint中删除和一下标志修改,最后,发送notify complete消息
    35                                 completePendingCheckpoint(checkpoint);
    36                             }
    37                             break;
    38                         //重复消息
    39                         case DUPLICATE:
    40                             LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
    41                                 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
    42                             break;
    43                         //未知消息
    44                         case UNKNOWN:
    45                             LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
    46                                     "because the task's execution attempt id was unknown. Discarding " +
    47                                     "the state handle to avoid lingering state.", message.getCheckpointId(),
    48                                 message.getTaskExecutionId(), message.getJob());
    49 
    50                             discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
    51                             break;
    52                         //废弃消息
    53                         case DISCARDED:
    54                             LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
    55                                 "because the pending checkpoint had been discarded. Discarding the " +
    56                                     "state handle tp avoid lingering state.",
    57                                 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
    58                             discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
    59                     }
    60 
    61                     return true;
    62                 }
    63                 else if (checkpoint != null) {
    64                     // this should not happen
    65                     throw new IllegalStateException(
    66                             "Received message for discarded but non-removed checkpoint " + checkpointId);
    67                 }
    68                 else {
    69                     boolean wasPendingCheckpoint;
    70 
    71                     // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
    72                     if (recentPendingCheckpoints.contains(checkpointId)) {
    73                         wasPendingCheckpoint = true;
    74                         LOG.warn("Received late message for now expired checkpoint attempt {} from " +
    75                             "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
    76                     }
    77                     else {
    78                         LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
    79                             checkpointId, message.getTaskExecutionId(), message.getJob());
    80                         wasPendingCheckpoint = false;
    81                     }
    82 
    83                     // try to discard the state so that we don't have lingering state lying around
    84                     discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
    85 
    86                     return wasPendingCheckpoint;
    87                 }
    88             }
    89         }
    View Code

          completePendingCheckpoint方法中发送NotifyCheckpointComplete消息的代码如下:

    1         for (ExecutionVertex ev : tasksToCommitTo) {
    2             Execution ee = ev.getCurrentExecutionAttempt();
    3             if (ee != null) {
    4                 ee.notifyCheckpointComplete(checkpointId, timestamp);
    5             }
    6         }
    View Code

        接收消息

          在TriggerCheckpoint消息接收中的有这部分代码,主要是调用notifyCheckpointComplete方法: task.notifyCheckpointComplete(checkpointId)。

      AcknowledgeCheckpoint消息

        由TaskManager发向JobManager,告知JobManager指定task的checkpoint已完成。该消息可能携带task的状态和checkpointMetrics。

        AcknowledgeCheckpoint消息类的两个属性:

    	private final SubtaskState subtaskState;//任务状态
    	private final CheckpointMetrics checkpointMetrics;
    

        发送消息

          发送消息的过程在RuntimeEnvironment类中的acknowledgeCheckpoint方法

    1         public void acknowledgeCheckpoint(
    2                 long checkpointId,
    3                 CheckpointMetrics checkpointMetrics,
    4                 SubtaskState checkpointStateHandles) {
    5             //通过CheckpointResponder接口的实例checkpointResponder发送ack消息
    6             checkpointResponder.acknowledgeCheckpoint(
    7                     jobId, executionId, checkpointId, checkpointMetrics,
    8                     checkpointStateHandles);
    9         }

          CheckpointResponder接口是checkpoint acknowledge and decline messages 的应答类。ActorGatewayCheckpointResponder是使用了ActorGateway的CheckpointResponder接口的实现类,包含acknowledgeCheckpoint和declineCheckpoint两个方法。

     1         @Override
     2         public void acknowledgeCheckpoint(
     3                 JobID jobID,
     4                 ExecutionAttemptID executionAttemptID,
     5                 long checkpointId,
     6                 CheckpointMetrics checkpointMetrics,
     7                 SubtaskState checkpointStateHandles) {
     8             //新建一个AcknowledgeCheckpoint消息
     9             AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
    10                     jobID, executionAttemptID, checkpointId, checkpointMetrics,
    11                     checkpointStateHandles);
    12             //通过actorGateway发送出去
    13             actorGateway.tell(message);
    14         }

        接收消息

          通过receiveAcknowledgeMessage方法接收(和NotifyCheckpointComplete消息的发送过程在同一个方法)。

      DeclineCheckpoint消息

        该消息由TaskManager发送给JobManager,用于告知CheckpointCoordinator:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。

        发送消息

          位于task类的triggerCheckpointBarrier方法中。

    org.apache.flink.runtime.taskmanager.Task
    1         try {
    2             boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
    3             if (!success) {
    4                 //通过CheckpointResponder发送消息,类似发送AcknowledgeCheckpoint消息
    5                 checkpointResponder.declineCheckpoint(
    6                         getJobID(), getExecutionId(), checkpointID,
    7                         new CheckpointDeclineTaskNotReadyException(taskName));
    8             }
    9         }

        接收消息

          CheckpointCoordinator中的receiveDeclineMessage方法。

     1         public void receiveDeclineMessage(DeclineCheckpoint message) {
     2             if (shutdown || message == null) {
     3                 return;
     4             }
     5             if (!job.equals(message.getJob())) {
     6                 throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
     7                     message.getJob() + " while this coordinator handles job " + job);
     8             }
     9             
    10             final long checkpointId = message.getCheckpointId();
    11             final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
    12 
    13             PendingCheckpoint checkpoint;
    14 
    15             synchronized (lock) {
    16                 // we need to check inside the lock for being shutdown as well, otherwise we
    17                 // get races and invalid error log messages
    18                 if (shutdown) {
    19                     return;
    20                 }
    21 
    22                 checkpoint = pendingCheckpoints.get(checkpointId);
    23 
    24                 if (checkpoint != null && !checkpoint.isDiscarded()) {
    25                     //如果是待处理的Checkpoint且没有被遗弃
    26                     LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
    27                             checkpointId, message.getTaskExecutionId(), reason);
    28 
    29                     pendingCheckpoints.remove(checkpointId);//将checkpointId从pendingCheckpoints中删除
    30                     checkpoint.abortDeclined();
    31                     rememberRecentCheckpointId(checkpointId);
    32 
    33                     // we don't have to schedule another "dissolving" checkpoint any more because the
    34                     // cancellation barriers take care of breaking downstream alignments
    35                     // we only need to make sure that suspended queued requests are resumed
    36 
    37                     //是否还有更多pending 的checkpoint
    38                     boolean haveMoreRecentPending = false;
    39                     for (PendingCheckpoint p : pendingCheckpoints.values()) {
    40                         if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
    41                             haveMoreRecentPending = true;
    42                             break;
    43                         }
    44                     }
    45                     //
    46                     if (!haveMoreRecentPending) {
    47                         triggerQueuedRequests();
    48                     }
    49                 }
    50                 else if (checkpoint != null) {
    51                     // this should not happen
    52                     throw new IllegalStateException(
    53                             "Received message for discarded but non-removed checkpoint " + checkpointId);
    54                 }
    55                 else if (LOG.isDebugEnabled()) {
    56                     if (recentPendingCheckpoints.contains(checkpointId)) {
    57                         // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
    58                         LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
    59                                 checkpointId, reason);
    60                     } else {
    61                         // message is for an unknown checkpoint. might be so old that we don't even remember it any more
    62                         LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
    63                                 checkpointId, reason);
    64                     }
    65                 }
    66             }
    67         }
    View Code

    我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=vq5xxge4ldlk

  • 相关阅读:
    ssh图示+hibernate图示
    spring Transactional
    Spring datasource
    sqlloader导入数据
    Spring Aop Annotation(@Pointcut)
    ajax传输文件+检验
    Spring Aop Annotation
    JDK的动态代理
    nginx代理gitlab
    python相关
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/9236491.html
Copyright © 2011-2022 走看看