zoukankan      html  css  js  c++  java
  • Flink

     

    public class StreamTaskState implements Serializable, Closeable {
    
        private static final long serialVersionUID = 1L;
        
        private StateHandle<?> operatorState;
    
        private StateHandle<Serializable> functionState;
    
        private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

    Flink中state分为三种,

    可以看到,StreamTaskState是对三种state的封装,

    1. KVState

    是最基本的state,

    抽象是一对,KvState和KvStateSnapshot
    通过两个接口,互相转化

    /**
     * Key/Value state implementation for user-defined state. The state is backed by a state
     * backend, which typically follows one of the following patterns: Either the state is stored
     * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
     * state backend into some store (during checkpoints), or the key/value state is in fact backed
     * by an external key/value store as the state backend, and checkpoints merely record the
     * metadata of what is considered part of the checkpoint.
     * 
     * @param <K> The type of the key.
     * @param <N> The type of the namespace.
     * @param <S> The type of {@link State} this {@code KvState} holds.
     * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
     * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
     */
    public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {
    
        /**
         * Sets the current key, which will be used when using the state access methods.
         *
         * @param key The key.
         */
        void setCurrentKey(K key);
    
        /**
         * Sets the current namespace, which will be used when using the state access methods.
         *
         * @param namespace The namespace.
         */
        void setCurrentNamespace(N namespace);
    
        /**
         * Creates a snapshot of this state.
         * 
         * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
         * @param timestamp The timestamp of the checkpoint.
         * @return A snapshot handle for this key/value state.
         * 
         * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
         *                   can react to failed snapshots.
         */
        KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
    
        /**
         * Disposes the key/value state, releasing all occupied resources.
         */
        void dispose();
    }
    定义也比较简单,关键是snapshot接口,产生KvStateSnapshot
    public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
            extends StateObject {
    
        /**
         * Loads the key/value state back from this snapshot.
         *
         * @param stateBackend The state backend that created this snapshot and can restore the key/value state
         *                     from this snapshot.
         * @param keySerializer The serializer for the keys.
         * @param classLoader The class loader for user-defined types.
         *
         * @return An instance of the key/value state loaded from this snapshot.
         * 
         * @throws Exception Exceptions can occur during the state loading and are forwarded. 
         */
        KvState<K, N, S, SD, Backend> restoreState(
            Backend stateBackend,
            TypeSerializer<K> keySerializer,
            ClassLoader classLoader) throws Exception;
    }

    KvStateSnapshot,对应于KvState,关键是restoreState接口

    以具体的,FsState为例,

    public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
    extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {

    可以看到AbstractFsState是继承AbstractHeapState的,因为对于FsState的状态也是cache在Heap中的,只是在snapshot的时候需要写文件

    所以先看下AbstractHeapState,

    /**
     * Base class for partitioned {@link ListState} implementations that are backed by a regular
     * heap hash map. The concrete implementations define how the state is checkpointed.
     * 
     * @param <K> The type of the key.
     * @param <N> The type of the namespace.
     * @param <SV> The type of the values in the state.
     * @param <S> The type of State
     * @param <SD> The type of StateDescriptor for the State S
     * @param <Backend> The type of the backend that snapshots this key/value state.
     */
    public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
            implements KvState<K, N, S, SD, Backend>, State {
    
        /** Map containing the actual key/value pairs */
        protected final HashMap<N, Map<K, SV>> state; //可以看到这里,多了个namespace的概念,避免key太容易重复
    
        /** Serializer for the state value. The state value could be a List<V>, for example. */
        protected final TypeSerializer<SV> stateSerializer;
    
        /** The serializer for the keys */
        protected final TypeSerializer<K> keySerializer;
    
        /** The serializer for the namespace */
        protected final TypeSerializer<N> namespaceSerializer;
    
        /** This holds the name of the state and can create an initial default value for the state. */
        protected final SD stateDesc; //StateDescriptor,用于放一些state的信息,比如default值
    
        /** The current key, which the next value methods will refer to */
        protected K currentKey;
    
        /** The current namespace, which the access methods will refer to. */
        protected N currentNamespace = null;
    
        /** Cache the state map for the current key. */
        protected Map<K, SV> currentNSState;
    
        /**
         * Creates a new empty key/value state.
         *
         * @param keySerializer The serializer for the keys.
         * @param namespaceSerializer The serializer for the namespace.
         * @param stateDesc The state identifier for the state. This contains name
         *                           and can create a default state value.
         */
        protected AbstractHeapState(TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            TypeSerializer<SV> stateSerializer,
            SD stateDesc) {
            this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>());
        }
     
    AbstractFsState
    public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
            extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
    
        /** The file system state backend backing snapshots of this state */
        private final FsStateBackend backend;
    
    
        public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath); //
    
        @Override
        public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
    
            try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //
    
                // serialize the state to the output stream
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out)); 
                outView.writeInt(state.size());
                for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
                    N namespace = namespaceState.getKey();
                    namespaceSerializer.serialize(namespace, outView);
                    outView.writeInt(namespaceState.getValue().size());
                    for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
                        keySerializer.serialize(entry.getKey(), outView);
                        stateSerializer.serialize(entry.getValue(), outView);
                    }
                }
                outView.flush(); //真实的内容是刷到文件的
    
                // create a handle to the state
                return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path
            }
        }
    }

     

    对于kv state,也分为好几类,valuestate,liststate,reducestate,foldstate,

    简单起见,先看valuestate

    public class FsValueState<K, N, V>
        extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
        implements ValueState<V> {
    
        @Override
        public V value() {
            if (currentNSState == null) {
                currentNSState = state.get(currentNamespace); //现初始化当前namespace的kv
            }
            if (currentNSState != null) {
                V value = currentNSState.get(currentKey);
                return value != null ? value : stateDesc.getDefaultValue(); //取出value,如果为null,从stateDesc中取出default
            }
            return stateDesc.getDefaultValue();
        }
    
        @Override
        public void update(V value) {
            if (currentKey == null) {
                throw new RuntimeException("No key available.");
            }
    
            if (value == null) {
                clear();
                return;
            }
    
            if (currentNSState == null) {
                currentNSState = new HashMap<>();
                state.put(currentNamespace, currentNSState);
            }
    
            currentNSState.put(currentKey, value); //更新
        }
        
        @Override
        public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
            return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); //以文件路径,创建snapshot
        }

     

    继续看FsStateSnapshot

    public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
            extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
    
        public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap); //
    
        @Override
        public KvState<K, N, S, SD, FsStateBackend> restoreState(
            FsStateBackend stateBackend,
            final TypeSerializer<K> keySerializer,
            ClassLoader classLoader) throws Exception {
    
            // state restore
            ensureNotClosed();
    
            try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
                // make sure the in-progress restore from the handle can be closed 
                registerCloseable(inStream);
    
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
    
                final int numKeys = inView.readInt();
                HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
    
                for (int i = 0; i < numKeys; i++) {
                    N namespace = namespaceSerializer.deserialize(inView);
                    final int numValues = inView.readInt();
                    Map<K, SV> namespaceMap = new HashMap<>(numValues);
                    stateMap.put(namespace, namespaceMap);
                    for (int j = 0; j < numValues; j++) {
                        K key = keySerializer.deserialize(inView);
                        SV value = stateSerializer.deserialize(inView);
                        namespaceMap.put(key, value);
                    }
                }
    
                return createFsState(stateBackend, stateMap); //
            }
            catch (Exception e) {
                throw new Exception("Failed to restore state from file system", e);
            }
        }
    }

     

    FsValueState内部实现的snapshot
    public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
        private static final long serialVersionUID = 1L;
    
        public Snapshot(TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            TypeSerializer<V> stateSerializer,
            ValueStateDescriptor<V> stateDescs,
            Path filePath) {
            super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
        }
    
        @Override
        public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {
            return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
        }
    }

     

    2. FunctionState

    stateHandle对于KvState,更为通用一些

    /**
     * StateHandle is a general handle interface meant to abstract operator state fetching. 
     * A StateHandle implementation can for example include the state itself in cases where the state 
     * is lightweight or fetching it lazily from some external storage when the state is too large.
     */
    public interface StateHandle<T> extends StateObject {
    
        /**
         * This retrieves and return the state represented by the handle.
         *
         * @param userCodeClassLoader Class loader for deserializing user code specific classes
         *
         * @return The state represented by the handle.
         * @throws java.lang.Exception Thrown, if the state cannot be fetched.
         */
        T getState(ClassLoader userCodeClassLoader) throws Exception;
    }

     

    3. OperatorState,典型的是windowOperater的状态

    OperatorState,也是用StateHandle作为,snapshot的抽象

     

    看下这三种State如何做snapshot的

    AbstractStreamOperator,看看和checkpoint相关的接口,可以看到只会snapshot KvState
    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        // here, we deal with key/value state snapshots
        
        StreamTaskState state = new StreamTaskState();
    
        if (stateBackend != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
                stateBackend.snapshotPartitionedState(checkpointId, timestamp);
            if (partitionedSnapshots != null) {
                state.setKvStates(partitionedSnapshots);
            }
        }
    
    
        return state;
    }
    
    @Override
    @SuppressWarnings("rawtypes,unchecked")
    public void restoreState(StreamTaskState state) throws Exception {
        // restore the key/value state. the actual restore happens lazily, when the function requests
        // the state again, because the restore method needs information provided by the user function
        if (stateBackend != null) {
            stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
        }
    }
    
    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        if (stateBackend != null) {
            stateBackend.notifyOfCompletedCheckpoint(checkpointId);
        }
    }

     

    AbstractUdfStreamOperator
    public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT>

    这个首先继承了AbstractStreamOperator,看下checkpoint相关的接口,

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); //先执行super的snapshotOperatorState,即Kv state的snapshot
    
        if (userFunction instanceof Checkpointed) {
            @SuppressWarnings("unchecked")
            Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
            
            Serializable udfState;
            try {
                udfState = chkFunction.snapshotState(checkpointId, timestamp); //snapshot,function的状态
            } 
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }
            
            if (udfState != null) {
                try {
                    AbstractStateBackend stateBackend = getStateBackend();
                    StateHandle<Serializable> handle = 
                            stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); //调用stateBackend存储state,并返回snapshot
                    state.setFunctionState(handle);
                }
                catch (Exception e) {
                    throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                            + e.getMessage(), e);
                }
            }
        }
        
        return state;
    }
    
    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        super.restoreState(state);
        
        StateHandle<Serializable> stateHandle =  state.getFunctionState();
        
        if (userFunction instanceof Checkpointed && stateHandle != null) {
            @SuppressWarnings("unchecked")
            Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
            
            Serializable functionState = stateHandle.getState(getUserCodeClassloader());
            if (functionState != null) {
                try {
                    chkFunction.restoreState(functionState);
                }
                catch (Exception e) {
                    throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
                }
            }
        }
    }
    
    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        super.notifyOfCompletedCheckpoint(checkpointId);
    
        if (userFunction instanceof CheckpointListener) {
            ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
        }
    }
    可以看到这个operater,会snapshot kv state,和udf中的function的state

     

    WindowOperator,典型的operater state
    public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    
        if (mergingWindowsByKey != null) {
            TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
            ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
            for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
                setKeyContext(key.getKey());
                ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
                mergeState.clear();
                key.getValue().persist(mergeState);
            }
        }
    
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
    
        AbstractStateBackend.CheckpointStateOutputView out =
            getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
    
        snapshotTimers(out);
    
        taskState.setOperatorState(out.closeAndGetHandle());
    
        return taskState;
    }
    
    @Override
    public void restoreState(StreamTaskState taskState) throws Exception {
        super.restoreState(taskState);
    
        final ClassLoader userClassloader = getUserCodeClassloader();
    
        @SuppressWarnings("unchecked")
        StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
        DataInputView in = inputState.getState(userClassloader);
    
        restoreTimers(in);
    }
  • 相关阅读:
    【tyvj1952】easy
    【noip2005】篝火晚会
    BZOJ4818: [Sdoi2017]序列计数
    BZOJ2436: [Noi2011]Noi嘉年华
    BZOJ4826: [Hnoi2017]影魔
    BZOJ4540: [Hnoi2016]序列
    BZOJ4827: [Hnoi2017]礼物
    BZOJ3527: [Zjoi2014]力
    BZOJ4407: 于神之怒加强版
    BZOJ1854: [Scoi2010]游戏
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6103312.html
Copyright © 2011-2022 走看看