zoukankan      html  css  js  c++  java
  • Flink

    如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的

    有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩

    但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的

    所以taskmanager挂掉后,数据还是没了,

    所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot

     

    kv state需要由rockdb来管理,这是和内存或file backend最大的不同

    AbstractRocksDBState
    /**
     * Base class for {@link State} implementations that store state in a RocksDB database.
     *
     * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that
     * the {@link RocksDBStateBackend} manages and checkpoints.
     *
     * @param <K> The type of the key.
     * @param <N> The type of the namespace.
     * @param <S> The type of {@link State}.
     * @param <SD> The type of {@link StateDescriptor}.
     */
    public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
            implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
    
        /** Serializer for the namespace */
        private final TypeSerializer<N> namespaceSerializer;
    
        /** The current namespace, which the next value methods will refer to */
        private N currentNamespace;
    
        /** Backend that holds the actual RocksDB instance where we store state */
        protected RocksDBStateBackend backend;
    
        /** The column family of this particular instance of state */
        protected ColumnFamilyHandle columnFamily;
    
        /**
         * We disable writes to the write-ahead-log here.
         */
        private final WriteOptions writeOptions;
    
        /**
         * Creates a new RocksDB backed state.
         *
         * @param namespaceSerializer The serializer for the namespace.
         */
        protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
                TypeSerializer<N> namespaceSerializer,
                RocksDBStateBackend backend) {
    
            this.namespaceSerializer = namespaceSerializer;
            this.backend = backend;
    
            this.columnFamily = columnFamily;
    
            writeOptions = new WriteOptions();
            writeOptions.setDisableWAL(true);
        }
    
        @Override
        public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
                long timestamp) throws Exception {
            throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
        }
    }

     

    RocksDBValueState

    /**
     * {@link ValueState} implementation that stores state in RocksDB.
     *
     * @param <K> The type of the key.
     * @param <N> The type of the namespace.
     * @param <V> The type of value that the state state stores.
     */
    public class RocksDBValueState<K, N, V>
        extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>
        implements ValueState<V> {
    
        @Override
        public V value() {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
            try {
                writeKeyAndNamespace(out);
                byte[] key = baos.toByteArray();
                byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value
                if (valueBytes == null) {
                    return stateDesc.getDefaultValue();
                }
                return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
            } catch (IOException|RocksDBException e) {
                throw new RuntimeException("Error while retrieving data from RocksDB.", e);
            }
        }
    
        @Override
        public void update(V value) throws IOException {
            if (value == null) {
                clear();
                return;
            }
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
            try {
                writeKeyAndNamespace(out);
                byte[] key = baos.toByteArray();
                baos.reset();
                valueSerializer.serialize(value, out);
                backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db
            } catch (Exception e) {
                throw new RuntimeException("Error while adding data to RocksDB", e);
            }
        }
    }

    因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink - Working with State

     

    RocksDBStateBackend

    初始化过程,

    /**
     * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
     * store very large state that exceeds memory and spills to disk.
     * 
     * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
     * For persistence against loss of machines, checkpoints take a snapshot of the
     * RocksDB database, and persist that snapshot in a file system (by default) or
     * another configurable state backend.
     * 
     * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
     * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
     * {@link #setOptions(OptionsFactory)}.
     */
    public class RocksDBStateBackend extends AbstractStateBackend {
    
        // ------------------------------------------------------------------------
        //  Static configuration values
        // ------------------------------------------------------------------------
        
        /** The checkpoint directory that we copy the RocksDB backups to. */
        private final Path checkpointDirectory;
    
        /** The state backend that stores the non-partitioned state */
        private final AbstractStateBackend nonPartitionedStateBackend;
    
        /**
         * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
         * to store state. The different k/v states that we have don't each have their own RocksDB
         * instance. They all write to this instance but to their own column family.
         */
        protected volatile transient RocksDB db; //RocksDB实例
    
        /**
         * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
         * file system and location defined by the given URI.
         * 
         * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
         * host and port in the URI, or have the Hadoop configuration that describes the file system
         * (host / high-availability group / possibly credentials) either referenced from the Flink
         * config, or included in the classpath.
         *
         * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
         * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
         */
        public RocksDBStateBackend(String checkpointDataUri) throws IOException {
            this(new Path(checkpointDataUri).toUri());
        }
    
        /**
         * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
         * file system and location defined by the given URI.
         *
         * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
         * host and port in the URI, or have the Hadoop configuration that describes the file system
         * (host / high-availability group / possibly credentials) either referenced from the Flink
         * config, or included in the classpath.
         *
         * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
         * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
         */
        public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
            // creating the FsStateBackend automatically sanity checks the URI
            FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot
            
            this.nonPartitionedStateBackend = fsStateBackend;
            this.checkpointDirectory = fsStateBackend.getBasePath();
        }
        
        // ------------------------------------------------------------------------
        //  State backend methods
        // ------------------------------------------------------------------------
        
        @Override
        public void initializeForJob(
                Environment env, 
                String operatorIdentifier,
                TypeSerializer<?> keySerializer) throws Exception {
            
            super.initializeForJob(env, operatorIdentifier, keySerializer);
    
            this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
    
            RocksDB.loadLibrary(); //初始化rockdb
    
            List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件
            // RocksDB seems to need this...
            columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
            List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
            try {
                db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB
            } catch (RocksDBException e) {
                throw new RuntimeException("Error while opening RocksDB instance.", e);
            }
        }

     

    snapshotPartitionedState

    @Override
    public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
        if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
            return new HashMap<>();
        }
    
        if (fullyAsyncBackup) {
            return performFullyAsyncSnapshot(checkpointId, timestamp);
        } else {
            return performSemiAsyncSnapshot(checkpointId, timestamp);
        }
    }

     

    snapshot分为全异步和半异步两种,

     

    半异步,

    /**
     * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
     * This backup is the asynchronously copied to the final checkpoint location.
     */
    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
        // We don't snapshot individual k/v states since everything is stored in a central
        // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
        // that checkpoint. We use the in injectKeyValueStateSnapshots to restore.
    
        final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
        final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
    
        long startTime = System.currentTimeMillis();
    
        BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
        // we disabled the WAL
        backupOptions.setBackupLogFiles(false);
        // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
        backupOptions.setSync(false); //设为异步
    
        try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
            // wait before flush with "true"
            backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘
        }
    
        long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时
        LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
    
        // draw a copy in case it get's changed while performing the async snapshot
        List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
        for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
            kvStateInformationCopy.add(state.f1);
        }
        SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, //
                backupUri,
                kvStateInformationCopy,
                checkpointId);
    
    
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
        result.put("dummy_state", dummySnapshot);
        return result;
    }

     

    SemiAsyncSnapshot.materialize

    @Override
    public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
        try {
            long startTime = System.currentTimeMillis();
            HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);  //从本地磁盘copy到hdfs
            long endTime = System.currentTimeMillis();
            LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
            return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
        } catch (Exception e) {
            FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
            fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
            throw e;
        } finally {
            FileUtils.deleteQuietly(localBackupPath);
        }
    }

     

    全异步

    /**
     * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
     * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
     * location. The only synchronous part is the drawing of the {@code Snapshot} which
     * is essentially free.
     */
    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
        // we draw a snapshot from RocksDB then iterate over all keys at that point
        // and store them in the backup location
    
        final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
    
        long startTime = System.currentTimeMillis();
    
        org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘
    
        long endTime = System.currentTimeMillis();
        LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
    
        // draw a copy in case it get's changed while performing the async snapshot
        Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
        columnFamiliesCopy.putAll(kvStateInformation);
        FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入
                this,
                backupUri,
                columnFamiliesCopy,
                checkpointId);
    
    
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
        result.put("dummy_state", dummySnapshot);
        return result;
    }

     

    FullyAsyncSnapshot.materialize

    可以看到需要自己去做db内容的序列化到文件的过程

    @Override
    public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
        try {
            long startTime = System.currentTimeMillis();
    
            CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
    
            outputView.writeInt(columnFamilies.size());
    
            // we don't know how many key/value pairs there are in each column family.
            // We prefix every written element with a byte that signifies to which
            // column family it belongs, this way we can restore the column families
            byte count = 0;
            Map<String, Byte> columnFamilyMapping = new HashMap<>();
            for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
                columnFamilyMapping.put(column.getKey(), count);
    
                outputView.writeByte(count);
    
                ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
                ooOut.writeObject(column.getValue().f1);
                ooOut.flush();
    
                count++;
            }
    
            ReadOptions readOptions = new ReadOptions();
            readOptions.setSnapshot(snapshot);
    
            for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
                byte columnByte = columnFamilyMapping.get(column.getKey());
    
                synchronized (dbCleanupLock) {
                    if (db == null) {
                        throw new RuntimeException("RocksDB instance was disposed. This happens " +
                                "when we are in the middle of a checkpoint and the job fails.");
                    }
                    RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
                    iterator.seekToFirst();
                    while (iterator.isValid()) {
                        outputView.writeByte(columnByte);
                        BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
                                outputView);
                        BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
                                outputView);
                        iterator.next();
                    }
                }
            }
    
            StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
    
            long endTime = System.currentTimeMillis();
            LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
            return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
        } finally {
            synchronized (dbCleanupLock) {
                if (db != null) {
                    db.releaseSnapshot(snapshot);
                }
            }
            snapshot = null;
        }
    }

     

    CheckpointStateOutputView

    backend.createCheckpointStateOutputView

    public CheckpointStateOutputView createCheckpointStateOutputView(
            long checkpointID, long timestamp) throws Exception {
        return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
    }

    关键createCheckpointStateOutputStream

     

    RocksDBStateBackend

    @Override
    public CheckpointStateOutputStream createCheckpointStateOutputStream(
            long checkpointID, long timestamp) throws Exception {
        
        return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
    }

     

    看看nonPartitionedStateBackend是什么?

    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
        // creating the FsStateBackend automatically sanity checks the URI
        FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
        
        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }

    其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot

     

    restoreState

    @Override
    public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
        if (keyValueStateSnapshots.size() == 0) {
            return;
        }
    
        KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
        if (dummyState instanceof FinalSemiAsyncSnapshot) {
            restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
        } else if (dummyState instanceof FinalFullyAsyncSnapshot) {
            restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
        } else {
            throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
        }
    }

    同样也分为两种,半异步和全异步,过程基本就是snapshot的逆过程

  • 相关阅读:
    ps 玻璃效果
    svn 官方下载
    svn
    c# form 无标题
    app Inventor google 拖放手机代码块
    paas
    java 延迟
    c# 执行 cmd
    c # xml操作 (无法将类型为“System.Xml.XmlComment”的对象强制转换为类型“System.Xml.XmlElement”)
    eclipse 安装插件 link方式
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6114236.html
Copyright © 2011-2022 走看看