zoukankan      html  css  js  c++  java
  • Flink

    Flink – Checkpoint

    没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充

     

    StreamOperator

    /**
     * Basic interface for stream operators. Implementers would implement one of
     * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
     * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
     * that process elements.
     * 
     * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
     * offers default implementation for the lifecycle and properties methods.
     *
     * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
     * the timer service, timer callbacks are also guaranteed not to be called concurrently with
     * methods on {@code StreamOperator}.
     * 
     * @param <OUT> The output type of the operator
     */
    public interface StreamOperator<OUT> extends Serializable {
        
        // ------------------------------------------------------------------------
        //  life cycle
        // ------------------------------------------------------------------------
        
        /**
         * Initializes the operator. Sets access to the context and the output.
         */
        void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
    
        /**
         * This method is called immediately before any elements are processed, it should contain the
         * operator's initialization logic.
         * 
         * @throws java.lang.Exception An exception in this method causes the operator to fail.
         */
        void open() throws Exception;
    
        /**
         * This method is called after all records have been added to the operators via the methods
         * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
         * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
         * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
    
         * <p>
         * The method is expected to flush all remaining buffered data. Exceptions during this flushing
         * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
         * because the last data items are not processed properly.
         * 
         * @throws java.lang.Exception An exception in this method causes the operator to fail.
         */
        void close() throws Exception;
    
        /**
         * This method is called at the very end of the operator's life, both in the case of a successful
         * completion of the operation, and in the case of a failure and canceling.
         * 
         * This method is expected to make a thorough effort to release all resources
         * that the operator has acquired.
         */
        void dispose();
    
        // ------------------------------------------------------------------------
        //  state snapshots
        // ------------------------------------------------------------------------
    
        /**
         * Called to draw a state snapshot from the operator. This method snapshots the operator state
         * (if the operator is stateful) and the key/value state (if it is being used and has been
         * initialized).
         *
         * @param checkpointId The ID of the checkpoint.
         * @param timestamp The timestamp of the checkpoint.
         *
         * @return The StreamTaskState object, possibly containing the snapshots for the
         *         operator and key/value state.
         *
         * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
         *                   and the key/value state.
         */
        StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
        
        /**
         * Restores the operator state, if this operator's execution is recovering from a checkpoint.
         * This method restores the operator state (if the operator is stateful) and the key/value state
         * (if it had been used and was initialized when the snapshot ocurred).
         *
         * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
         * and before {@link #open()}.
         *
         * @param state The state of operator that was snapshotted as part of checkpoint
         *              from which the execution is restored.
         * 
         * @param recoveryTimestamp Global recovery timestamp
         *
         * @throws Exception Exceptions during state restore should be forwarded, so that the system can
         *                   properly react to failed state restore and fail the execution attempt.
         */
        void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;
    
        /**
         * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
         *
         * @param checkpointId The ID of the checkpoint that has been completed.
         *
         * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
         *                   the program to fail and enter recovery.
         */
        void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
    
        // ------------------------------------------------------------------------
        //  miscellaneous
        // ------------------------------------------------------------------------
        
        void setKeyContextElement(StreamRecord<?> record) throws Exception;
        
        /**
         * An operator can return true here to disable copying of its input elements. This overrides
         * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
         */
        boolean isInputCopyingDisabled();
        
        ChainingStrategy getChainingStrategy();
    
        void setChainingStrategy(ChainingStrategy strategy);
    }

    这对接口会负责,将operator的state做snapshot和restore相应的state

    StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;

    void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;

     

    首先看到,生成和恢复的时候,都是以StreamTaskState为接口

    public class StreamTaskState implements Serializable, Closeable {
    
        private static final long serialVersionUID = 1L;
        
        private StateHandle<?> operatorState;
    
        private StateHandle<Serializable> functionState;
    
        private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

    可以看到,StreamTaskState是对三种state的封装

    AbstractStreamOperator,先只考虑kvstate的情况,其他的更简单

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        // here, we deal with key/value state snapshots
        
        StreamTaskState state = new StreamTaskState();
    
        if (stateBackend != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
                stateBackend.snapshotPartitionedState(checkpointId, timestamp);
            if (partitionedSnapshots != null) {
                state.setKvStates(partitionedSnapshots);
            }
        }
    
    
        return state;
    }
    
    @Override
    @SuppressWarnings("rawtypes,unchecked")
    public void restoreState(StreamTaskState state) throws Exception {
        // restore the key/value state. the actual restore happens lazily, when the function requests
        // the state again, because the restore method needs information provided by the user function
        if (stateBackend != null) {
            stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
        }
    }

    可以看到flink1.1.0和之前比逻辑简化了,把逻辑都抽象到stateBackend里面去

     

    AbstractStateBackend
    /**
     * A state backend defines how state is stored and snapshotted during checkpoints.
     */
    public abstract class AbstractStateBackend implements java.io.Serializable {
    
        protected transient TypeSerializer<?> keySerializer;
    
        protected transient ClassLoader userCodeClassLoader;
    
        protected transient Object currentKey;
    
        /** For efficient access in setCurrentKey() */
        private transient KvState<?, ?, ?, ?, ?>[] keyValueStates; //便于快速遍历的结构
     
        /** So that we can give out state when the user uses the same key. */
        protected transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName; //记录key的kvState
    
        /** For caching the last accessed partitioned state */
        private transient String lastName;
    
        @SuppressWarnings("rawtypes")
        private transient KvState lastState;

     

    stateBackend.snapshotPartitionedState

    public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
        if (keyValueStates != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
    
            for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
                KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
                snapshots.put(entry.getKey(), snapshot);
            }
            return snapshots;
        }
    
        return null;
    }

    逻辑很简单,只是把cache的所有kvstate,创建一下snapshot,再push到HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots

     

    stateBackend.injectKeyValueStateSnapshots,只是上面的逆过程

    /**
     * Injects K/V state snapshots for lazy restore.
     * @param keyValueStateSnapshots The Map of snapshots
     */
    @SuppressWarnings("unchecked,rawtypes")
    public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
        if (keyValueStateSnapshots != null) {
            if (keyValueStatesByName == null) {
                keyValueStatesByName = new HashMap<>();
            }
    
            for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {
                KvState kvState = state.getValue().restoreState(this,
                    keySerializer,
                    userCodeClassLoader);
                keyValueStatesByName.put(state.getKey(), kvState);
            }
            keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
        }
    }

     

    具体看看FsState的snapshot和restore逻辑,

    AbstractFsState.snapshot

    @Override
    public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
    
        try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //
    
            // serialize the state to the output stream
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out)); 
            outView.writeInt(state.size());
            for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
                N namespace = namespaceState.getKey();
                namespaceSerializer.serialize(namespace, outView);
                outView.writeInt(namespaceState.getValue().size());
                for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
                    keySerializer.serialize(entry.getKey(), outView);
                    stateSerializer.serialize(entry.getValue(), outView);
                }
            }
            outView.flush(); //真实的内容是刷到文件的
    
            // create a handle to the state
            return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path
        }
    }

     

    createCheckpointStateOutputStream

    @Override
    public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
        checkFileSystemInitialized();
    
        Path checkpointDir = createCheckpointDirPath(checkpointID); //根据checkpointId,生成文件path
        int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
        return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
    }

     

    FsCheckpointStateOutputStream

    封装了write,flush, closeAndGetPath接口,

    public void flush() throws IOException {
        if (!closed) {
            // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
            if (outStream == null) {
                // make sure the directory for that specific checkpoint exists
                fs.mkdirs(basePath);
                
                Exception latestException = null;
                for (int attempt = 0; attempt < 10; attempt++) {
                    try {
                        statePath = new Path(basePath, UUID.randomUUID().toString());
                        outStream = fs.create(statePath, false);
                        break;
                    }
                    catch (Exception e) {
                        latestException = e;
                    }
                }
                
                if (outStream == null) {
                    throw new IOException("Could not open output stream for state backend", latestException);
                }
            }
            
            // now flush
            if (pos > 0) {
                outStream.write(writeBuffer, 0, pos);
                pos = 0;
            }
        }
    }

     

    AbstractFsStateSnapshot.restoreState

    @Override
    public KvState<K, N, S, SD, FsStateBackend> restoreState(
        FsStateBackend stateBackend,
        final TypeSerializer<K> keySerializer,
        ClassLoader classLoader) throws Exception {
    
        // state restore
        ensureNotClosed();
    
        try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
            // make sure the in-progress restore from the handle can be closed 
            registerCloseable(inStream);
    
            DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
    
            final int numKeys = inView.readInt();
            HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
    
            for (int i = 0; i < numKeys; i++) {
                N namespace = namespaceSerializer.deserialize(inView);
                final int numValues = inView.readInt();
                Map<K, SV> namespaceMap = new HashMap<>(numValues);
                stateMap.put(namespace, namespaceMap);
                for (int j = 0; j < numValues; j++) {
                    K key = keySerializer.deserialize(inView);
                    SV value = stateSerializer.deserialize(inView);
                    namespaceMap.put(key, value);
                }
            }
    
            return createFsState(stateBackend, stateMap); //
        }
        catch (Exception e) {
            throw new Exception("Failed to restore state from file system", e);
        }
    }
  • 相关阅读:
    C# 把带有父子关系的数据转化为------树形结构的数据 ,以及 找出父子级关系的数据中里面的根数据Id
    基于角色的菜单按钮权限的设计及实现
    基于记忆性的中值滤波O(r)与O(1)复杂度的算法实现
    Canny算法检测边缘
    图像平滑去噪之高斯滤波器
    运动元素提取,基于帧间差分与背景差分
    基于RGB与HSI颜色模型的图像提取法
    基于阈值的灰度图像提取法
    C语言深入学习
    大津法实现图像二值化
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6103314.html
Copyright © 2011-2022 走看看