zoukankan      html  css  js  c++  java
  • flink checkpoint机制的实现

    启用checkpoint机制

    调用StreamExecutionEnvironmentenableCheckpointing方法,interval间隔需要大于等于10ms

    	public StreamExecutionEnvironment enableCheckpointing(long interval) {
    		checkpointCfg.setCheckpointInterval(interval);
    		return this;
    	}
    

    作业checkpoint流程描述

    JobGraphGenerator构建JobGraph的过程中会生成三个List<JobVertexID>类型的节点列表:

    • triggerVertices:所有的source并行实例节点,会定时接收到CheckpointCoordinator发送的triggerCheckpoint请求
    • ackVertices:所有并行实例节点,用于接收并处理各自checkpoint完成acknowledge确认消息
    • commitVertices: 所有并行实例节点,当所有实例节点都确认完成checkpoint后,CheckpointCoordinator会调用notifyCheckpointComplete通知这些实例节点该检查点已经全部完成

    如果用户启用了checkpoint,则CheckpointCoordinator的定时任务会周期性的生成新的checkpoint id并调用与triggerVertices对应的CheckpointCoordinator.tasksToTrigger中各节点的triggerCheckpoint方法,该方法通过RPC方式触发TaskExecutor->triggerCheckpoint,然后TaskExecutor会找到对应的Task并调用其triggerCheckpointBarrier方法,在此方法中会异步调用StreamTasktriggerCheckpoint方法。然后各Task节点checkpoint执行完成后会远程调用acknowledgeCheckpoint通知CheckpointCoordinator,如果该checkpoint所有节点都已经确认完成则CheckpointCoordinator会调用tasksToCommitTo中各节点的notifyCheckpointComplete方法通知各节点该检查点已经成功完成。

    组件之间的交互图

    CheckpointCoordinator定时任务

    triggerCheckpoint方法:

    				final CheckpointOptions checkpointOptions = new CheckpointOptions(
    						props.getCheckpointType(),
    						checkpointStorageLocation.getLocationReference());
    
    				// send the messages to the tasks that trigger their checkpoint
    				for (Execution execution: executions) {
    					execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    				}
    

    Execution

    Execution.triggerCheckpoint方法会通过RPC方式调用TaskExecutor.triggerCheckpoint方法:

    	/**
    	 * Trigger a new checkpoint on the task of this execution.
    	 *
    	 * @param checkpointId of th checkpoint to trigger
    	 * @param timestamp of the checkpoint to trigger
    	 * @param checkpointOptions of the checkpoint to trigger
    	 */
    	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
    		final LogicalSlot slot = assignedResource;
    
    		if (slot != null) {
    			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    
    			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
    		} else {
    			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
    				"no longer running.");
    		}
    	}
    

    TaskExecutor

    triggerCheckpoint方法会调用Task.triggerCheckpointBarrier方法:

    	@Override
    	public CompletableFuture<Acknowledge> triggerCheckpoint(
    			ExecutionAttemptID executionAttemptID,
    			long checkpointId,
    			long checkpointTimestamp,
    			CheckpointOptions checkpointOptions) {
    		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    
    		final Task task = taskSlotTable.getTask(executionAttemptID);
    
    		if (task != null) {
    			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
    
    			return CompletableFuture.completedFuture(Acknowledge.get());
    		} else {
    			final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
    
    			log.debug(message);
    			return FutureUtils.completedExceptionally(new CheckpointException(message));
    		}
    	}
    
    

    Task

    triggerCheckpointBarrier方法异步调用StreamTask.triggerCheckpoint方法:

    		if (executionState == ExecutionState.RUNNING && invokable != null) {
    
    			// build a local closure
    			final String taskName = taskNameWithSubtask;
    			final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
    				FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
    
    			Runnable runnable = new Runnable() {
    				@Override
    				public void run() {
    					// set safety net from the task's context for checkpointing thread
    					LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
    					FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
    
    					try {
    						boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
    						if (!success) {
    							checkpointResponder.declineCheckpoint(
    									getJobID(), getExecutionId(), checkpointID,
    									new CheckpointDeclineTaskNotReadyException(taskName));
    						}
    					}
    					catch (Throwable t) {
    						if (getExecutionState() == ExecutionState.RUNNING) {
    							failExternally(new Exception(
    								"Error while triggering checkpoint " + checkpointID + " for " +
    									taskNameWithSubtask, t));
    						} else {
    							LOG.debug("Encountered error while triggering checkpoint {} for " +
    								"{} ({}) while being not in state running.", checkpointID,
    								taskNameWithSubtask, executionId, t);
    						}
    					} finally {
    						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
    					}
    				}
    			};
    			executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
    		}
    

    StreamTask执行checkopint

    
    	private boolean performCheckpoint(
    			CheckpointMetaData checkpointMetaData,
    			CheckpointOptions checkpointOptions,
    			CheckpointMetrics checkpointMetrics) throws Exception {
    
    		LOG.debug("Starting checkpoint ({}) {} on task {}",
    			checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
    
    		synchronized (lock) {
    			if (isRunning) {
    				// we can do a checkpoint
    
    				// All of the following steps happen as an atomic step from the perspective of barriers and
    				// records/watermarks/timers/callbacks.
    				// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
    				// checkpoint alignments
    
    				// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
    				//           The pre-barrier work should be nothing or minimal in the common case.
    				operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
    
    				// Step (2): Send the checkpoint barrier downstream
    				operatorChain.broadcastCheckpointBarrier(
    						checkpointMetaData.getCheckpointId(),
    						checkpointMetaData.getTimestamp(),
    						checkpointOptions);
    
    				// Step (3): Take the state snapshot. This should be largely asynchronous, to not
    				//           impact progress of the streaming topology
    				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
    				return true;
    			}
    
    

    AbstractUdfStreamOperatorsnapshotState方法:

    	@Override
    	public void snapshotState(StateSnapshotContext context) throws Exception {
    		super.snapshotState(context);
    		StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    	}
    
    

    然后会根据userFunction实现的是CheckpointedFunction还是ListCheckpointed接口执行对应的方法:

    	private static boolean trySnapshotFunctionState(
    			StateSnapshotContext context,
    			OperatorStateBackend backend,
    			Function userFunction) throws Exception {
    
    		if (userFunction instanceof CheckpointedFunction) {
    			((CheckpointedFunction) userFunction).snapshotState(context);
    
    			return true;
    		}
    
    		if (userFunction instanceof ListCheckpointed) {
    			@SuppressWarnings("unchecked")
    			List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
    					snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
    
    			ListState<Serializable> listState = backend.
    					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
    			listState.clear();
    
    			if (null != partitionableState) {
    				try {
    					for (Serializable statePartition : partitionableState) {
    						listState.add(statePartition);
    					}
    				} catch (Exception e) {
    					listState.clear();
    
    					throw new Exception("Could not write partitionable state to operator " +
    						"state backend.", e);
    				}
    			}
    
    			return true;
    		}
    
    		return false;
    	}
    

    调用时序图

    锁的使用

    StreamTaskStreamOperator交互使用StreamTask.lock对象进行同步,保证checkpoint的一致性调用。

    SourceStreamTask

    	/**
    	 * Gets the lock object on which all operations that involve data and state mutation have to lock.
    	 * @return The checkpoint lock object.
    	 */
    	public Object getCheckpointLock() {
    		return lock;
    	}
    
    	@Override
    	protected void run() throws Exception {
    		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    	}
    

    StreamSource.run方法:

    		this.ctx = StreamSourceContexts.getSourceContext(
    			timeCharacteristic,
    			getProcessingTimeService(),
    			lockingObject,
    			streamStatusMaintainer,
    			collector,
    			watermarkInterval,
    			-1);
    
    		try {
    			userFunction.run(ctx);
    
    			// if we get here, then the user function either exited after being done (finite source)
    			// or the function was canceled or stopped. For the finite source case, we should emit
    			// a final watermark that indicates that we reached the end of event-time
    			if (!isCanceledOrStopped()) {
    				ctx.emitWatermark(Watermark.MAX_WATERMARK);
    			}
    
    

    因此如果SourceFunction需要checkpoint(实现了CheckpointedFunction或者ListCheckpointed)则必须在run方法中使用synchronized (ctx.getCheckpointLock())进行同步,类似下面这样:

           public void run(SourceContext<T> ctx) {
               while (isRunning && count < 1000) {
                   // this synchronized block ensures that state checkpointing,
                   // internal state updates and emission of elements are an atomic operation
                   synchronized (ctx.getCheckpointLock()) {
                       ctx.collect(count);
                       count++;
                   }
               }
           }
    

    OneInputStreamTask

    	@Override
    	protected void run() throws Exception {
    		// cache processor reference on the stack, to make the code more JIT friendly
    		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
    
    		while (running && inputProcessor.processInput()) {
    			// all the work happens in the "processInput" method
    		}
    	}
    
    

    StreamInputProcessor.processInput方法保证了所有用户自定义方法的调用都在lock同步块内:

    					} else if (recordOrMark.isLatencyMarker()) {
    						// handle latency marker
    						synchronized (lock) {
    							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
    						}
    						continue;
    					} else {
    						// now we can do the actual processing
    						StreamRecord<IN> record = recordOrMark.asRecord();
    						synchronized (lock) {
    							numRecordsIn.inc();
    							streamOperator.setKeyContextElement1(record);
    							streamOperator.processElement(record);
    						}
    						return true;
    					}
    

    StreamMap.processElement:

    	@Override
    	public void processElement(StreamRecord<IN> element) throws Exception {
    		output.collect(element.replace(userFunction.map(element.getValue())));
    	}
    
    

    Source类型的Function自定义方法中不需要再进行额外的checkpoint锁同步。

  • 相关阅读:
    ios 数据类型转换 UIImage转换为NSData NSData转换为NSString
    iOS UI 12 block传值
    iOS UI 11 单例
    iOS UI 08 uitableview 自定义cell
    iOS UI 07 uitableviewi3
    iOS UI 07 uitableviewi2
    iOS UI 07 uitableview
    iOS UI 05 传值
    iOS UI 04 轨道和动画
    iOS UI 03 事件和手势
  • 原文地址:https://www.cnblogs.com/andyhe/p/11720865.html
Copyright © 2011-2022 走看看