zoukankan      html  css  js  c++  java
  • flink1.7 checkpoint源码分析

    初始化state类
    //org.apache.flink.streaming.runtime.tasks.StreamTask#initializeState
    initializeState();
    private void initializeState() throws Exception {

    StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

    for (StreamOperator<?> operator : allOperators) {
    if (null != operator) {
    operator.initializeState();
    }
    }
    }
    operator.initializeState() 调用的方法路径 org.apache.flink.streaming.api.operators.AbstractStreamOperator#initializeState() ,所有的操作流类都继承该类,同时也没有重写这个方法。
    public final void initializeState() throws Exception {
    ////这里会调用状态后端,里面很重要
    1. final StreamOperatorStateContext context =
    streamTaskStateManager.streamOperatorStateContext(
    getOperatorID(),
    getClass().getSimpleName(),
    this,
    keySerializer,
    streamTaskCloseableRegistry,
    metrics);
    ...
    |
    streamTaskStateManager.streamOperatorStateContext(......)调用方法的路径org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl#streamOperatorStateContext
    ......
    // -------------- Keyed State Backend 这里是重点 关于checkpoint--------------
    keyedStatedBackend = keyedStatedBackend(
    keySerializer,
    operatorIdentifierText,
    prioritizedOperatorSubtaskStates,
    streamTaskCloseableRegistry,
    metricGroup);

    // -------------- Operator State Backend 这里是重点 关于checkpoint --------------
    operatorStateBackend = operatorStateBackend(
    operatorIdentifierText,
    prioritizedOperatorSubtaskStates,
    streamTaskCloseableRegistry);
    ......
    keyedStatedBackend() 这个方法最里面是调用了 org.apache.flink.streaming.api.operators.BackendRestorerProcedure#attemptCreateAndRestore
    private T attemptCreateAndRestore(Collection restoreState) throws Exception {
    ......
    // create a new, empty backend.
    final T backendInstance = instanceSupplier.get();

    // attempt to restore from snapshot (or null if no state was checkpointed).
    backendInstance.restore(restoreState);
    ......
    }
    backendInstance.restore(restoreState)调用的方法路径org.apache.flink.runtime.state.DefaultOperatorStateBackend#restore
    // registeredOperatorStates这个对象是核心
    ...
    PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName());

    if (null == listState) {
    listState = new PartitionableListState<>(restoredMetaInfo);
    //重点,这里就是存储了快照状态类
    //********************************************************************
    registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
    //********************************************************************
    } else {
    // TODO with eager state registration in place, check here for serializer migration strategies
    }
    ...
    triggerCheckpoint 将定时触发执行checkpoint,而上面是是初始化的执行逻辑

    定时快照state类
    org.apache.flink.runtime.checkpoint.CheckpointCoordinator#triggerCheckpoint(long, boolean) 
    ......
    // send the messages to the tasks that trigger their checkpoint 我猜测这里就是远程发送触发checkpoint的步骤 这里进行的数据文件的生成奶奶的
    for (Execution execution: executions) {
    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }
    ......
    execution.triggerCheckpoint调用路径 org.apache.flink.runtime.executiongraph.Execution#triggerCheckpoint
    /**
    * Trigger a new checkpoint on the task of this execution.
    * @param checkpointId of th checkpoint to trigger
    * @param timestamp of the checkpoint to trigger
    * @param checkpointOptions of the checkpoint to trigger
    /
    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
          ......
    final LogicalSlot slot = assignedResource;
    if (slot != null) {
    final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
    }
          .....
    }
    taskManagerGateway.triggerCheckpoint(......)里面最终调用路径 org.apache.flink.runtime.taskexecutor.TaskExecutor#triggerCheckpoint
    @Override
    public CompletableFuture triggerCheckpoint(
    ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {
      ......
    final Task task = taskSlotTable.getTask(executionAttemptID);
    if (task != null) {
    task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

    return CompletableFuture.completedFuture(Acknowledge.get());
    }
      ......
    }
    task.triggerCheckpointBarrier(......)调用路径 org.apache.flink.runtime.taskmanager.Task#triggerCheckpointBarrier
    /
    *

    • Calls the invokable to trigger a checkpoint.
    • 这里开始出发执行checkpoint,应该算是入口了,会调用org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint
    • AsyncCheckpointRunnable 任务在里面被执行
    • @param checkpointID The ID identifying the checkpoint.
    • @param checkpointTimestamp The timestamp associated with the checkpoint.
    • @param checkpointOptions Options for performing this checkpoint.
      */
      public void triggerCheckpointBarrier(
      final long checkpointID,
      long checkpointTimestamp,
      final CheckpointOptions checkpointOptions) {

      final AbstractInvokable invokable = this.invokable;
      final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

      if (executionState == ExecutionState.RUNNING && invokable != null) {

      // build a local closure
      final String taskName = taskNameWithSubtask;
      final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
      FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

      Runnable runnable = new Runnable() {
      @Override
      public void run() {
      // set safety net from the task's context for checkpointing thread
      LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
      FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

      try {
      boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
      ......
      }
        ......
      }
      };
      //创建线程数为1的线程池,提交runnable任务运行
      executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
      }
      }
      invokable.triggerCheckpoint(.....)里面最终调用的方法链如下:
      org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint
      org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint
      // we can do a checkpoint

      // All of the following steps happen as an atomic step from the perspective of barriers and
      // records/watermarks/timers/callbacks.
      // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
      // checkpoint alignments

      // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
      //           The pre-barrier work should be nothing or minimal in the common case.
      operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

      // Step (2): Send the checkpoint barrier downstream 生成状态数据 存储数据的对象为checkpointOptions 尼玛 今天debug没有生成数据呦
      operatorChain.broadcastCheckpointBarrier(
      checkpointMetaData.getCheckpointId(),
      checkpointMetaData.getTimestamp(),
      checkpointOptions);

      // Step (3): Take the state snapshot. This should be largely asynchronous, to not
      //           impact progress of the streaming topology
      checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
      checkpointState(......) 里面最终调用org.apache.flink.streaming.runtime.tasks.StreamTask.CheckpointingOperation#executeCheckpointing()
      重点警戒线.....................................................
      ......
      //调用用户的快照方法
      for (StreamOperator<?> op : allOperators) {//不同的算子对应的子类不一样,
      checkpointStreamOperator(op);
      }
      //后面生成数据,哪里生成数据了,要找到

      //这个run任务好像只生成元数据
      // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
      AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
      owner,
      operatorSnapshotsInProgress,
      checkpointMetaData,
      checkpointMetrics,
      startAsyncPartNano);

      owner.cancelables.registerCloseable(asyncCheckpointRunnable);
      owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable;
      ......
    1. checkpointStreamOperator(op);

    private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
    if (null != op) {
           //这个构造方法是核心
    OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
    checkpointMetaData.getCheckpointId(),
    checkpointMetaData.getTimestamp(),
    checkpointOptions,
    storageLocation);
    operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
    }
    }
    op.snapshotState()是核心,调用org.apache.flink.streaming.api.operators.AbstractStreamOperator#snapshotState(long, long, org.apache.flink.runtime.checkpoint.CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)
    注意因为op是子类,有些累实现AbstractStreamOperator有些子类实现AbstractUdfStreamOperator,所以在下面调用snapshotState(snapshotContext)方法时,会根据子类的实现不同,调用org.apache.flink.streaming.api.operators.AbstractStreamOperator#snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)
    或org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator#snapshotState
    AbstractStreamOperator 实现类有94个
    AbstractUdfStreamOperator实现类有42个
    AbstractUdfStreamOperator继承AbstractStreamOperator
    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
    CheckpointStreamFactory factory) throws Exception {

    try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
    checkpointId,
    timestamp,
    factory,
    keyGroupRange,
    getContainingTask().getCancelables())) {

    //继承AbstractUdfStreamOperator的操作类会调用用户的快照方法,继承AbstractStreamOperator的操作类会调用这个方法,但是这个方法没有做什么东西。
    snapshotState(snapshotContext);
           //上面调用好用户的快照方法了,就是确定了状态类里面目前的数据了。
           //下面就是如何访问到状态类,讲状态内的数据写入磁盘了。
    snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
    //这里是生产状态数据文件
    if (null != operatorStateBackend) {
    System.out.println(Thread.currentThread().getName()+"::这里将状态数据写入文件中");
    snapshotInProgress.setOperatorStateManagedFuture(
    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
    }
           //这里是生产状态数据文件
    if (null != keyedStateBackend) {
    snapshotInProgress.setKeyedStateManagedFuture(
    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
    }
    }
    return snapshotInProgress;
    }
    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions))调用路径org.apache.flink.runtime.state.DefaultOperatorStateBackend#snapshot
    谜底就在下面
    public RunnableFuture<SnapshotResult> snapshot(
    long checkpointId,
    long timestamp,
    @Nonnull CheckpointStreamFactory streamFactory,
    @Nonnull CheckpointOptions checkpointOptions) throws Exception {
    long syncStartTime = System.currentTimeMillis();

           //这个是超级关键的地方,你想知道如何访问到用户函数中的状态类,就在这里。
    RunnableFuture<SnapshotResult> snapshotRunner =
    snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

    snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
    return snapshotRunner;
    }
    snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions)调用路径,取决于用户指定的后端状态,默认调用路径如下org.apache.flink.runtime.state.DefaultOperatorStateBackend.DefaultOperatorStateBackendSnapshotStrategy#snapshot
    DefaultOperatorStateBackendSnapshotStrategy 是DefaultOperatorStateBackend的内部类
    public RunnableFuture<SnapshotResult> snapshot(......) throws IOException {
    //貌似数据就存在 registeredOperatorStates对象里面 其实下面的步骤不用研究,就是将状态数据写入文件,主要看看这个registeredOperatorStates是怎么弄到的
    //************重点 registeredOperatorStates   对象
    final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
    new HashMap<>(registeredOperatorStates.size());
    final Map<String, BackendWritableBroadcastState> registeredBroadcastStatesDeepCopies =
    new HashMap<>(registeredBroadcastStates.size());

    ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
    try {
    // eagerly create deep copies of the list and the broadcast states (if any)
    // in the synchronous phase, so that we can use them in the async writing.
    //entry.getValue() 里面就是状态类 将状态类存储在新建的map对象中
    if (!registeredOperatorStates.isEmpty()) {
    for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
    PartitionableListState<?> listState = entry.getValue();
    if (null != listState) {
    listState = listState.deepCopy();
    }
    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
    }
    }
    //广播状态
    if (!registeredBroadcastStates.isEmpty()) {
    for (Map.Entry<String, BackendWritableBroadcastState> entry : registeredBroadcastStates.entrySet()) {
    BackendWritableBroadcastState broadcastState = entry.getValue();
    if (null != broadcastState) {
    broadcastState = broadcastState.deepCopy();
    }
    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
    }
    }
    }

            //这个方法里面生成了状态数据文件
            AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
                new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
    


    @Override
    protected SnapshotResult callInternal() throws Exception {
    ......
    // get the registered operator state infos ...
    List operatorMetaInfoSnapshots =
    new ArrayList<>(registeredOperatorStatesDeepCopies.size());

    for (Map.Entry<String, PartitionableListState<?>> entry :
    registeredOperatorStatesDeepCopies.entrySet()) {
    operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
    }

    // ... get the registered broadcast operator state infos ...
    List broadcastMetaInfoSnapshots =
    new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

    for (Map.Entry<String, BackendWritableBroadcastState> entry :
    registeredBroadcastStatesDeepCopies.entrySet()) {
    broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
    }

    // ... write them all in the checkpoint stream ...
    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

    OperatorBackendSerializationProxy backendSerializationProxy =
    new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

    backendSerializationProxy.write(dov);

    // ... and then go for the states ...

    ......
    }
    };

    final FutureTask<SnapshotResult> task =
    snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);

    if (!asynchronousSnapshots) {
    task.run();
    }

    return task;
    }
    }
    从上面我们可以看到,状态类都存放在registeredOperatorStatesDeepCopies这个map中。
    用户能够更新状态类的数据都是因为这样访问到了状态类
    public void initializeState(FunctionInitializationContext context) throws Exception {
    ......
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    ......
    }
    调用的就是org.apache.flink.runtime.state.DefaultOperatorStateBackend#getListState(org.apache.flink.api.common.state.ListStateDescriptor)
    /**
    * @Description: 返回状态类的时候,将状态类放入map对象供后面写入文件中
    * @Param:
    * @return:
    * @Author: intsmaze
    * @Date: 2019/1/18
    /
    private ListState getListState(
    ListStateDescriptor stateDescriptor,
    OperatorStateHandle.Mode mode) throws StateMigrationException {
    @SuppressWarnings("unchecked")
    PartitionableListState previous = (PartitionableListState) accessedStatesByName.get(name);
    if (previous != null) {
    checkStateNameAndMode(
    previous.getStateMetaInfo().getName(),
    name,
    previous.getStateMetaInfo().getAssignmentMode(),
    mode);
    return previous;
    }
          ......
    PartitionableListState partitionableListState = (PartitionableListState) registeredOperatorStates.get(name);

    if (null == partitionableListState) {
    // no restored state for the state name; simply create new state holder
    partitionableListState = new PartitionableListState<>(
    new RegisteredOperatorStateBackendMetaInfo<>(
    name,
    partitionStateSerializer,
    mode));
    //这里也会存储状态类数据registeredOperatorStates这个对象和DefaultOperatorStateBackendSnapshotStrategy类的快照方法访问的对象共享
    //
    ************************************************************
    registeredOperatorStates.put(name, partitionableListState);
    }

  • 相关阅读:
    java学习笔记05--字符串 .
    java学习笔记04--数组
    java学习笔记07--日期操作类
    java学习笔记14--动态代理
    java学习笔记09--反射机制
    java学习笔记06--正则表达式
    java学习笔记08--泛型
    java学习笔记10--枚举
    java学习笔记11--Annotation
    交叉熵代价函数
  • 原文地址:https://www.cnblogs.com/intsmaze/p/10773095.html
Copyright © 2011-2022 走看看