zoukankan      html  css  js  c++  java
  • flink checkpoint 源码分析 (一)

    转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/8029356.html 

    checkpoint是Flink Fault Tolerance机制的重要构成部分,flink checkpoint的核心类名为org.apache.flink.runtime.checkpoint.CheckpointCoordinator。

    定期产生的checkpoint事件

    flink的checkpoint是由CheckpointCoordinator内部的一个timer线程池定时产生的,具体代码由ScheduledTrigger这个Runnable类启动。

        private final class ScheduledTrigger implements Runnable {
    
            @Override
            public void run() {
                try {
                    triggerCheckpoint(System.currentTimeMillis(), true);
                }
                catch (Exception e) {
                    LOG.error("Exception while triggering checkpoint.", e);
                }
            }
        }

    整个triggerCheckpoint方法大致分为三个部分:

    1 环境前置检查

        // Sanity check
            if (props.externalizeCheckpoint() && targetDirectory == null) {
                throw new IllegalStateException("No target directory specified to persist checkpoint to.");
            }
    
            // make some eager pre-checks
            synchronized (lock) {
                // abort if the coordinator has been shutdown in the meantime
                if (shutdown) {
                    return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                }
    
                // Don't allow periodic checkpoint if scheduling has been disabled
                if (isPeriodic && !periodicScheduling) {
                    return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
                }
    
                // validate whether the checkpoint can be triggered, with respect to the limit of
                // concurrent checkpoints, and the minimum time between checkpoints.
                // these checks are not relevant for savepoints
                if (!props.forceCheckpoint()) {
                    // sanity check: there should never be more than one trigger request queued
                    if (triggerRequestQueued) {
                        LOG.warn("Trying to trigger another checkpoint while one was queued already");
                        return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                    }
    
                    // if too many checkpoints are currently in progress, we need to mark that a request is queued
                    if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                        triggerRequestQueued = true;
                        if (currentPeriodicTrigger != null) {
                            currentPeriodicTrigger.cancel(false);
                            currentPeriodicTrigger = null;
                        }
                        return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                    }
    
                    // make sure the minimum interval between checkpoints has passed
                    final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                    final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
    
                    if (durationTillNextMillis > 0) {
                        if (currentPeriodicTrigger != null) {
                            currentPeriodicTrigger.cancel(false);
                            currentPeriodicTrigger = null;
                        }
                        // Reassign the new trigger to the currentPeriodicTrigger
                        currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                new ScheduledTrigger(),
                                durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
    
                        return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                    }
                }
            }
    
            // check if all tasks that we need to trigger are running.
            // if not, abort the checkpoint
            Execution[] executions = new Execution[tasksToTrigger.length];
            for (int i = 0; i < tasksToTrigger.length; i++) {
                Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
                if (ee != null && ee.getState() == ExecutionState.RUNNING) {
                    executions[i] = ee;
                } else {
                    LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
                            tasksToTrigger[i].getTaskNameWithSubtaskIndex());
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
            }
    
            // next, check if all tasks that need to acknowledge the checkpoint are running.
            // if not, abort the checkpoint
            Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
    
            for (ExecutionVertex ev : tasksToWaitFor) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ackTasks.put(ee.getAttemptId(), ev);
                } else {
                    LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
                            ev.getTaskNameWithSubtaskIndex());
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
            }

    上面的代码主要在生成一个chepoint之前进行了一些pre-checks,包括checkpoint的targetDirectory、正在进行中的pendingCheckpoint数量上限、前后两次checkpoint间隔是否过小、以及下游与checkpoint相关tasks是否存活等检测,任意一个条件不满足的都不会执行真正的checkpoint动作。

    2 生成pendingcheckpoint

            final long checkpointID;
                try {
                    // this must happen outside the coordinator-wide lock, because it communicates
                    // with external services (in HA mode) and may block for a while.
                    checkpointID = checkpointIdCounter.getAndIncrement();
                }
                catch (Throwable t) {
                    int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                    LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
    
                final PendingCheckpoint checkpoint = new PendingCheckpoint(
                    job,
                    checkpointID,
                    timestamp,
                    ackTasks,
                    props,
                    targetDirectory,
                    executor);
    
                if (statsTracker != null) {
                    PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
                        checkpointID,
                        timestamp,
                        props);
    
                    checkpoint.setStatsCallback(callback);
                }
    
                // schedule the timer that will clean up the expired checkpoints
                final Runnable canceller = new Runnable() {
                    @Override
                    public void run() {
                        synchronized (lock) {
                            // only do the work if the checkpoint is not discarded anyways
                            // note that checkpoint completion discards the pending checkpoint object
                            if (!checkpoint.isDiscarded()) {
                                LOG.info("Checkpoint " + checkpointID + " expired before completing.");
    
                                checkpoint.abortExpired();
                                pendingCheckpoints.remove(checkpointID);
                                rememberRecentCheckpointId(checkpointID);
    
                                triggerQueuedRequests();
                            }
                        }
                    }
                };

    pendingcheckpoint表示一个待处理的检查点,每个pendingcheckpoint标有一个全局唯一的递增checkpointID,并声明了一个canceller用于后续超时情况下的checkpoint清理用于释放资源。

        // re-acquire the coordinator-wide lock
                    synchronized (lock) {
                        // since we released the lock in the meantime, we need to re-check
                        // that the conditions still hold.
                        if (shutdown) {
                            return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                        }
                        else if (!props.forceCheckpoint()) {
                            if (triggerRequestQueued) {
                                LOG.warn("Trying to trigger another checkpoint while one was queued already");
                                return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                            }
    
                            if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                                triggerRequestQueued = true;
                                if (currentPeriodicTrigger != null) {
                                    currentPeriodicTrigger.cancel(false);
                                    currentPeriodicTrigger = null;
                                }
                                return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                            }
    
                            // make sure the minimum interval between checkpoints has passed
                            final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                            final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
    
                            if (durationTillNextMillis > 0) {
                                if (currentPeriodicTrigger != null) {
                                    currentPeriodicTrigger.cancel(false);
                                    currentPeriodicTrigger = null;
                                }
    
                                // Reassign the new trigger to the currentPeriodicTrigger
                                currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                        new ScheduledTrigger(),
                                        durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
    
                                return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                            }
                        }
    
                        LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
    
                        pendingCheckpoints.put(checkpointID, checkpoint);
    
                        ScheduledFuture<?> cancellerHandle = timer.schedule(
                                canceller,
                                checkpointTimeout, TimeUnit.MILLISECONDS);
    
                        if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                            // checkpoint is already disposed!
                            cancellerHandle.cancel(false);
                        }

    pendingcheckpoint在正式执行前还会再执行一遍前置检查,主要等待完成的检查点数量是否过多以及前后两次完成的检查点间隔是否过短等问题,这些检查都通过后,会把之前定义好的cancller注册到timer线程池,如果等待时间过长会主动回收checkpoint的资源。

    3 启动checkpoint执行

    发送这个checkpoint的checkpointID和timestamp到各个对应的executor,也就是给各个TaskManger发一个TriggerCheckpoint类型的消息。

                    CheckpointOptions checkpointOptions;
                    if (!props.isSavepoint()) {
                        checkpointOptions = CheckpointOptions.forCheckpoint();
                    } else {
                        checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
                    }
    
                    // send the messages to the tasks that trigger their checkpoint
                    for (Execution execution: executions) {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
    
                    numUnsuccessfulCheckpointsTriggers.set(0);
                    return new CheckpointTriggerResult(checkpoint);
        public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
            final SimpleSlot slot = assignedResource;
    
            if (slot != null) {
                final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    
                taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
            } else {
                LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                    "no longer running.");
            }
        }
        @Override
        public void triggerCheckpoint(
                ExecutionAttemptID executionAttemptID,
                JobID jobId,
                long checkpointId,
                long timestamp,
                CheckpointOptions checkpointOptions) {
    
            Preconditions.checkNotNull(executionAttemptID);
            Preconditions.checkNotNull(jobId);
    
            actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
        }

    其中,for (Execution execution: executions) 这里面的executions里面是所有的输入节点,也就是flink source节点,所以checkpoint这些barrier 时间首先从jobmanager发送给了所有的source task

    JobCheckpointingSettings settings = new JobCheckpointingSettings(
                triggerVertices,
                ackVertices,
                commitVertices,
                new CheckpointCoordinatorConfiguration(
                    interval,
                    cfg.getCheckpointTimeout(),
                    cfg.getMinPauseBetweenCheckpoints(),
                    cfg.getMaxConcurrentCheckpoints(),
                    retentionAfterTermination,
                    isExactlyOnce),
                serializedStateBackend,
                serializedHooks);
    
            jobGraph
    
    for (JobVertex vertex : jobVertices.values()) {
                if (vertex.isInputVertex()) {
                    triggerVertices.add(vertex.getID());
                }
                commitVertices.add(vertex.getID());
                ackVertices.add(vertex.getID());
            }
  • 相关阅读:
    Java课堂测试——一维数组
    05构建之法阅读笔记之二
    第八周个人总结
    团队项目第一篇——NABCD
    第七周学习进度报告
    地铁合作的第二周
    04构建之法阅读笔记之一
    第六周学习进度报告
    03人月神话阅读笔记之三
    地铁合作的第一周
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/8029356.html
Copyright © 2011-2022 走看看