zoukankan      html  css  js  c++  java
  • flink checkpoint 源码分析 (二)

    转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html     

         flink checkpoint 源码分析 (一)一文主要讲述了在JobManager端定时生成TriggerCheckpoint的代码部分,本文继续研究下TaskManager端如何处理收到的TriggerCheckpoint消息并执行对应的备份操作。

          TriggerCheckpoint消息进入TaskManager的处理路径为 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier

     1     public void triggerCheckpointBarrier(
     2             final long checkpointID,
     3             long checkpointTimestamp,
     4             final CheckpointOptions checkpointOptions) {
     5 
     6         final AbstractInvokable invokable = this.invokable;
     7         final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
     8 
     9         if (executionState == ExecutionState.RUNNING && invokable != null) {
    10             if (invokable instanceof StatefulTask) {
    11                 // build a local closure
    12                 final StatefulTask statefulTask = (StatefulTask) invokable;
    13                 final String taskName = taskNameWithSubtask;
    14                 final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
    15                     FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
    16                 Runnable runnable = new Runnable() {
    17                     @Override
    18                     public void run() {
    19                         // set safety net from the task's context for checkpointing thread
    20                         LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
    21                         FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
    22 
    23                         try {
    24                             boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
    25                             if (!success) {
    26                                 checkpointResponder.declineCheckpoint(
    27                                         getJobID(), getExecutionId(), checkpointID,
    28                                         new CheckpointDeclineTaskNotReadyException(taskName));
    29                             }
    30                         }
    31                         catch (Throwable t) {
    32                             if (getExecutionState() == ExecutionState.RUNNING) {
    33                                 failExternally(new Exception(
    34                                     "Error while triggering checkpoint " + checkpointID + " for " +
    35                                         taskNameWithSubtask, t));
    36                             } else {
    37                                 LOG.debug("Encountered error while triggering checkpoint {} for " +
    38                                     "{} ({}) while being not in state running.", checkpointID,
    39                                     taskNameWithSubtask, executionId, t);
    40                             }
    41                         } finally {
    42                             FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
    43                         }
    44                     }
    45                 };
    46                 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
    47             }
    48             else {
    49                 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
    50                         new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
    51                 
    52                 LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
    53                         taskNameWithSubtask, executionId);
    54 
    55             }
    56         }
    57         else {
    58             LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
    59 
    60             // send back a message that we did not do the checkpoint
    61             checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
    62                     new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
    63         }
    64     }

    在正常的情况下,triggerCheckpointBarrier会调用StreamTask内部实现的triggerCheckpoint()方法,并根据调用链条

    triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
        public void executeCheckpointing() throws Exception {
                startSyncPartNano = System.nanoTime();
    
                boolean failed = true;
                try {
                    for (StreamOperator<?> op : allOperators) {
                        checkpointStreamOperator(op);
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                                checkpointMetaData.getCheckpointId(), owner.getName());
                    }
    
                    startAsyncPartNano = System.nanoTime();
    
                    checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
    
                    // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
                    runAsyncCheckpointingAndAcknowledge();
                    failed = false;
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - finished synchronous part of checkpoint {}." +
                                "Alignment duration: {} ms, snapshot duration {} ms",
                            owner.getName(), checkpointMetaData.getCheckpointId(),
                            checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                            checkpointMetrics.getSyncDurationMillis());
                    }

    在executeCheckpointing方法里进行了两个操作,首先是对该task对应的所有StreamOperator对象调用checkpointStreamOperator(op)

    checkpointStreamOperator代码:

        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
                if (null != op) {
                    // first call the legacy checkpoint code paths
                    nonPartitionedStates.add(op.snapshotLegacyOperatorState(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions));
    
                    OperatorSnapshotResult snapshotInProgress = op.snapshotState(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions);
    
                    snapshotInProgressList.add(snapshotInProgress);
                } else {
                    nonPartitionedStates.add(null);
                    OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
                    snapshotInProgressList.add(emptySnapshotInProgress);
                }
            }

    StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最终由它的子类AbstractStreamOperator给出了一个final实现

        @Override
        public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
    
            KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                    keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
    
            OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
    
            CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
    
            try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                    checkpointId,
                    timestamp,
                    factory,
                    keyGroupRange,
                    getContainingTask().getCancelables())) {
    
                snapshotState(snapshotContext);
    
                snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
                snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
    
                if (null != operatorStateBackend) {
                    snapshotInProgress.setOperatorStateManagedFuture(
                        operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
    
                if (null != keyedStateBackend) {
                    snapshotInProgress.setKeyedStateManagedFuture(
                        keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
            } catch (Exception snapshotException) {
                try {
                    snapshotInProgress.cancel();
                } catch (Exception e) {
                    snapshotException.addSuppressed(e);
                }
    
                throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
                    getOperatorName() + '.', snapshotException);
            }
    
            return snapshotInProgress;
        }

    上述代码里的snapshotState(snapshotContext)方法在不同的最终operator中有自己的具体实现。

    executeCheckpointing的第二个操作是然后是调用runAsyncCheckpointingAndAcknowledge执行

    所有的state固化文件操作并返回acknowledgeCheckpoint给JobManager。

        private static final class AsyncCheckpointRunnable implements Runnable, Closeable {
    .....
    .....
    
                    if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                            CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
    
                        owner.getEnvironment().acknowledgeCheckpoint(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetrics,
                            subtaskState);

     补充,在上文提到的performCheckpoint方法内,调用checkpointState方法之前,flink会把预先把checkpointBarrier发送到下游task,以便下游operator尽快开始他们的checkpoint进程,

    这也是flink barrier机制生成barrier的地方。

        synchronized (lock) {
                if (isRunning) {
                    // we can do a checkpoint
    
                    // Since both state checkpointing and downstream barrier emission occurs in this
                    // lock scope, they are an atomic operation regardless of the order in which they occur.
                    // Given this, we immediately emit the checkpoint barriers, so the downstream operators
                    // can start their checkpoint work as soon as possible
                    operatorChain.broadcastCheckpointBarrier(
                            checkpointMetaData.getCheckpointId(),
                            checkpointMetaData.getTimestamp(),
                            checkpointOptions);
    
                    checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                    return true;
        public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
            try {
                CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
                for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                    streamOutput.broadcastEvent(barrier);
                }
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted while broadcasting checkpoint barrier");
            }
        }

     上述描述的触发checkpoint调用路径是针对source task的链路。对于其余非souce的operator,

    方法链路为StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier

    参考文档:

    Flink 原理与实现:如何生成 StreamGraph

  • 相关阅读:
    小内存 linux 主机部署 mysql
    IIS enable HTTP PUT and DELETE
    使用Topshelf部署.net core windows服务 Demo
    Windows Template Studio 创建 .net core wpf应用
    vue笔记——vue生命周期
    (转)idea如何快速查看接口的实现类
    vue项目设置启动自动打开浏览器
    批量添加题目功能(正则表达式的使用案例)
    markdown的diff效果
    SVN提交时取消某个文件的提交
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/8260370.html
Copyright © 2011-2022 走看看