如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的
有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩
但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的
所以taskmanager挂掉后,数据还是没了,
所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot
kv state需要由rockdb来管理,这是和内存或file backend最大的不同
AbstractRocksDBState
/** * Base class for {@link State} implementations that store state in a RocksDB database. * * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that * the {@link RocksDBStateBackend} manages and checkpoints. * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <S> The type of {@link State}. * @param <SD> The type of {@link StateDescriptor}. */ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>> implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
/** Serializer for the namespace */ private final TypeSerializer<N> namespaceSerializer; /** The current namespace, which the next value methods will refer to */ private N currentNamespace; /** Backend that holds the actual RocksDB instance where we store state */ protected RocksDBStateBackend backend; /** The column family of this particular instance of state */ protected ColumnFamilyHandle columnFamily; /** * We disable writes to the write-ahead-log here. */ private final WriteOptions writeOptions; /** * Creates a new RocksDB backed state. * * @param namespaceSerializer The serializer for the namespace. */ protected AbstractRocksDBState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, RocksDBStateBackend backend) { this.namespaceSerializer = namespaceSerializer; this.backend = backend; this.columnFamily = columnFamily; writeOptions = new WriteOptions(); writeOptions.setDisableWAL(true); } @Override public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId, long timestamp) throws Exception { throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend."); } }
RocksDBValueState
/** * {@link ValueState} implementation that stores state in RocksDB. * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <V> The type of value that the state state stores. */ public class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>> implements ValueState<V> { @Override public V value() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value if (valueBytes == null) { return stateDesc.getDefaultValue(); } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); } catch (IOException|RocksDBException e) { throw new RuntimeException("Error while retrieving data from RocksDB.", e); } } @Override public void update(V value) throws IOException { if (value == null) { clear(); return; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); baos.reset(); valueSerializer.serialize(value, out); backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } }
因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink - Working with State
RocksDBStateBackend
初始化过程,
/** * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can * store very large state that exceeds memory and spills to disk. * * <p>All key/value state (including windows) is stored in the key/value index of RocksDB. * For persistence against loss of machines, checkpoints take a snapshot of the * RocksDB database, and persist that snapshot in a file system (by default) or * another configurable state backend. * * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and * {@link #setOptions(OptionsFactory)}. */ public class RocksDBStateBackend extends AbstractStateBackend { // ------------------------------------------------------------------------ // Static configuration values // ------------------------------------------------------------------------ /** The checkpoint directory that we copy the RocksDB backups to. */ private final Path checkpointDirectory; /** The state backend that stores the non-partitioned state */ private final AbstractStateBackend nonPartitionedStateBackend; /** * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ protected volatile transient RocksDB db; //RocksDB实例 /** * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the * file system and location defined by the given URI. * * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system * host and port in the URI, or have the Hadoop configuration that describes the file system * (host / high-availability group / possibly credentials) either referenced from the Flink * config, or included in the classpath. * * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public RocksDBStateBackend(String checkpointDataUri) throws IOException { this(new Path(checkpointDataUri).toUri()); } /** * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the * file system and location defined by the given URI. * * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system * host and port in the URI, or have the Hadoop configuration that describes the file system * (host / high-availability group / possibly credentials) either referenced from the Flink * config, or included in the classpath. * * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public RocksDBStateBackend(URI checkpointDataUri) throws IOException { // creating the FsStateBackend automatically sanity checks the URI FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot this.nonPartitionedStateBackend = fsStateBackend; this.checkpointDirectory = fsStateBackend.getBasePath(); } // ------------------------------------------------------------------------ // State backend methods // ------------------------------------------------------------------------ @Override public void initializeForJob( Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception { super.initializeForJob(env, operatorIdentifier, keySerializer); this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer); RocksDB.loadLibrary(); //初始化rockdb List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件 // RocksDB seems to need this... columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); try { db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB } catch (RocksDBException e) { throw new RuntimeException("Error while opening RocksDB instance.", e); } }
snapshotPartitionedState
@Override public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception { if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) { return new HashMap<>(); } if (fullyAsyncBackup) { return performFullyAsyncSnapshot(checkpointId, timestamp); } else { return performSemiAsyncSnapshot(checkpointId, timestamp); } }
snapshot分为全异步和半异步两种,
半异步,
/** * Performs a checkpoint by using the RocksDB backup feature to backup to a directory. * This backup is the asynchronously copied to the final checkpoint location. */ private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception { // We don't snapshot individual k/v states since everything is stored in a central // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about // that checkpoint. We use the in injectKeyValueStateSnapshots to restore. final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId); final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId); long startTime = System.currentTimeMillis(); BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath()); // we disabled the WAL backupOptions.setBackupLogFiles(false); // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot backupOptions.setSync(false); //设为异步 try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) { // wait before flush with "true" backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘 } long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时 LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); // draw a copy in case it get's changed while performing the async snapshot List<StateDescriptor> kvStateInformationCopy = new ArrayList<>(); for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) { kvStateInformationCopy.add(state.f1); } SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, // backupUri, kvStateInformationCopy, checkpointId); HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>(); result.put("dummy_state", dummySnapshot); return result; }
SemiAsyncSnapshot.materialize
@Override public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception { try { long startTime = System.currentTimeMillis(); HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); //从本地磁盘copy到hdfs long endTime = System.currentTimeMillis(); LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors); } catch (Exception e) { FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); throw e; } finally { FileUtils.deleteQuietly(localBackupPath); } }
全异步
/** * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then * iterating over all key/value pairs in RocksDB to store them in the final checkpoint * location. The only synchronous part is the drawing of the {@code Snapshot} which * is essentially free. */ private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception { // we draw a snapshot from RocksDB then iterate over all keys at that point // and store them in the backup location final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId); long startTime = System.currentTimeMillis(); org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘 long endTime = System.currentTimeMillis(); LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); // draw a copy in case it get's changed while performing the async snapshot Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>(); columnFamiliesCopy.putAll(kvStateInformation); FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入 this, backupUri, columnFamiliesCopy, checkpointId); HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>(); result.put("dummy_state", dummySnapshot); return result; }
FullyAsyncSnapshot.materialize
可以看到需要自己去做db内容的序列化到文件的过程
@Override public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception { try { long startTime = System.currentTimeMillis(); CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime); outputView.writeInt(columnFamilies.size()); // we don't know how many key/value pairs there are in each column family. // We prefix every written element with a byte that signifies to which // column family it belongs, this way we can restore the column families byte count = 0; Map<String, Byte> columnFamilyMapping = new HashMap<>(); for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) { columnFamilyMapping.put(column.getKey(), count); outputView.writeByte(count); ObjectOutputStream ooOut = new ObjectOutputStream(outputView); ooOut.writeObject(column.getValue().f1); ooOut.flush(); count++; } ReadOptions readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) { byte columnByte = columnFamilyMapping.get(column.getKey()); synchronized (dbCleanupLock) { if (db == null) { throw new RuntimeException("RocksDB instance was disposed. This happens " + "when we are in the middle of a checkpoint and the job fails."); } RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); iterator.seekToFirst(); while (iterator.isValid()) { outputView.writeByte(columnByte); BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView); BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView); iterator.next(); } } } StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle(); long endTime = System.currentTimeMillis(); LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); return new FinalFullyAsyncSnapshot(stateHandle, checkpointId); } finally { synchronized (dbCleanupLock) { if (db != null) { db.releaseSnapshot(snapshot); } } snapshot = null; } }
CheckpointStateOutputView
backend.createCheckpointStateOutputView
public CheckpointStateOutputView createCheckpointStateOutputView( long checkpointID, long timestamp) throws Exception { return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp)); }
关键createCheckpointStateOutputStream
RocksDBStateBackend
@Override public CheckpointStateOutputStream createCheckpointStateOutputStream( long checkpointID, long timestamp) throws Exception { return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp); }
看看nonPartitionedStateBackend是什么?
public RocksDBStateBackend(URI checkpointDataUri) throws IOException { // creating the FsStateBackend automatically sanity checks the URI FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); this.nonPartitionedStateBackend = fsStateBackend; this.checkpointDirectory = fsStateBackend.getBasePath(); }
其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot
restoreState
@Override public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception { if (keyValueStateSnapshots.size() == 0) { return; } KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state"); if (dummyState instanceof FinalSemiAsyncSnapshot) { restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState); } else if (dummyState instanceof FinalFullyAsyncSnapshot) { restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState); } else { throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState); } }
同样也分为两种,半异步和全异步,过程基本就是snapshot的逆过程