zoukankan      html  css  js  c++  java
  • Flink学习(三)状态机制于容错机制,State与CheckPoint

    摘自Apache官网

    一、State的基本概念

    什么叫State?搜了一把叫做状态机制。可以用作以下用途。为了保证 at least once, exactly once,Flink引入了State和Checkpoint

    • 某个task/operator某时刻的中间结果
    • 快照(snapshot)
    • 程序一旦crash,恢复用的
    • 机器学习模型的参数

    二、Flink中包含的State

    Keyed State和Opreator State

    1、Keyed State基于KeyedStream的状态。这个状态是跟特定的key绑定的。对KeyedStream流上的每一个key,可能都对应一个state。

    2、Operator State。于Keyed State不同。Operator State根一个特定的Operator绑定。整个Operator对应一个State。相比较而言一个State上有多个Keyed State。举例子来说,在Flink中的Kafka Connector就使用了Operator State。会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

    一些原子State

    • ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

    • ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。

    • ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

    • MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

    上述的State仅仅与状态进行交互。而真正的状态值,有可能是存在内存,磁盘、或者其它分布式系统中。相当于只是我们持有了这个句柄。那么我们如何得到这个状态的句柄呢?Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态对应、从StateDescriptor派生了ValueStateDescriptor,ListStateDescriptor等descriptor。

    3、研究下Keyed State内部的结构

    在StreamingRuntimeContext这个类中可以看到各个State的get方法

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink.streaming.api.operators;
    
    import org.apache.flink.annotation.PublicEvolving;
    import org.apache.flink.api.common.accumulators.Accumulator;
    import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
    import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
    import org.apache.flink.api.common.state.AggregatingState;
    import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    import org.apache.flink.api.common.state.FoldingState;
    import org.apache.flink.api.common.state.FoldingStateDescriptor;
    import org.apache.flink.api.common.state.KeyedStateStore;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReducingState;
    import org.apache.flink.api.common.state.ReducingStateDescriptor;
    import org.apache.flink.api.common.state.StateDescriptor;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.runtime.execution.Environment;
    import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.graph.StreamConfig;
    import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    import org.apache.flink.util.Preconditions;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
     * for streaming operators.
     */
    @PublicEvolving
    public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
    
        /** The operator to which this function belongs. */
        private final AbstractStreamOperator<?> operator;
    
        /** The task environment running the operator. */
        private final Environment taskEnvironment;
    
        private final StreamConfig streamConfig;
    
        public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
                                        Environment env, Map<String, Accumulator<?, ?>> accumulators) {
            super(env.getTaskInfo(),
                    env.getUserClassLoader(),
                    operator.getExecutionConfig(),
                    accumulators,
                    env.getDistributedCacheEntries(),
                    operator.getMetricGroup());
    
            this.operator = operator;
            this.taskEnvironment = env;
            this.streamConfig = new StreamConfig(env.getTaskConfiguration());
        }
    
        // ------------------------------------------------------------------------
    
        /**
         * Returns the input split provider associated with the operator.
         *
         * @return The input split provider.
         */
        public InputSplitProvider getInputSplitProvider() {
            return taskEnvironment.getInputSplitProvider();
        }
    
        public ProcessingTimeService getProcessingTimeService() {
            return operator.getProcessingTimeService();
        }
    
        // ------------------------------------------------------------------------
        //  broadcast variables
        // ------------------------------------------------------------------------
    
        @Override
        public boolean hasBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
        }
    
        @Override
        public <RT> List<RT> getBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
        }
    
        @Override
        public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
            throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
        }
    
        // ------------------------------------------------------------------------
        //  key/value state
        // ------------------------------------------------------------------------
    
        @Override
        public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getState(stateProperties);
        }
    
        @Override
        public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getListState(stateProperties);
        }
    
        @Override
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getReducingState(stateProperties);
        }
    
        @Override
        public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getAggregatingState(stateProperties);
        }
    
        @Override
        public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getFoldingState(stateProperties);
        }
    
        @Override
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
            KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
            stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
            return keyedStateStore.getMapState(stateProperties);
        }
    
        private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
            KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
            Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
            return keyedStateStore;
        }
    
        // ------------------ expose (read only) relevant information from the stream config -------- //
    
        /**
         * Returns true if checkpointing is enabled for the running job.
         *
         * @return true if checkpointing is enabled.
         */
        public boolean isCheckpointingEnabled() {
            return streamConfig.isCheckpointingEnabled();
        }
    
        /**
         * Returns the checkpointing mode.
         *
         * @return checkpointing mode
         */
        public CheckpointingMode getCheckpointMode() {
            return streamConfig.getCheckpointMode();
        }
    
        /**
         * Returns the buffer timeout of the job.
         *
         * @return buffer timeout (in milliseconds)
         */
        public long getBufferTimeout() {
            return streamConfig.getBufferTimeout();
        }
    
    }

    所有的State都继承自StateDescpritor这个类。简单看一下构造函数。实际上包含了三个参数,

    • 名称
    • 类型--属于哪一类state
    • 默认值
     1 protected StateDescriptor(String name, Class<T> type, T defaultValue) {
     2         this.name = requireNonNull(name, "name must not be null");
     3         requireNonNull(type, "type class must not be null");
     4 
     5         try {
     6             this.typeInfo = TypeExtractor.createTypeInfo(type);
     7         } catch (Exception e) {
     8             throw new RuntimeException(
     9                     "Could not create the type information for '" + type.getName() + "'. " +
    10                     "The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
    11                     "In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
    12                     "For example, to describe 'Tuple2<String, String>' as a generic type, use " +
    13                     "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
    14         }
    15 
    16         this.defaultValue = defaultValue;
    17     }

    再具体的可以自己研究研究,建议工作遇到或者对源码有兴趣可以读读。或者结合实际应用理解下会更快

    三、容错机制

    1、Flink的核心容错机制是不断的给数据流绘制Snapshots。当系统回滚的时候,这些snapshots就扮演了checkpoints的作用。快照机制受Chandy-Lamport 算法的启发。

    读了下论文研究了下这个算法。英文的看不太懂。找了个中文的。

    大概就是通过token 和marker来判断是哪里出了问题和需要恢复。

    具体可以参考:

    https://www.jianshu.com/p/938001e998f5

    谢谢!
  • 相关阅读:
    学习进度条(六)
    结对开发——-四则运算四
    结对开发——最大联通子数组的和
    团队项目成员和题目
    下载与安装APache Cordova
    学习进度条(五)
    构建之法阅读笔记05
    敏捷开发方法综述
    学习进度条(四)
    psp项目计划
  • 原文地址:https://www.cnblogs.com/ylxn/p/10645603.html
Copyright © 2011-2022 走看看