zoukankan      html  css  js  c++  java
  • LevelDB源码之五Current文件Manifest文件版本信息

    版本信息有什么用?先来简要说明三个类的具体用途:

    • Version:代表了某一时刻的数据库版本信息,版本信息的主要内容是当前各个Level的SSTable数据文件列表。
    • VersionSet:维护了一份Version列表,包含当前Alive的所有Version信息,列表中第一个代表数据库的当前版本。
    • VersionEdit:表示Version之间的变化,相当于delta 增量,表示有增加了多少文件,删除了文件。Version0 +VersionEdit-->Version1。VersionEdit会保存到MANIFEST文件中,当做数据恢复时就会从MANIFEST文件中读出来重建数据。那么,何时会触发版本变迁呢?Compaction。

    有了上面的描述,再来看版本信息到底有什么用呢?

    如果还不能给出答案,将上述三个类当作一个整体,再来看Version类组到底包含了哪些信息:

    • 运行信息
      • 运行期各种递增ID值:log number(log编号)、next file number(下一个文件编号)、last sequence(单条write操作递增该编号,可认为是版本号)、prev log number(目前已弃用)。
      • 比较器名称
    • 数据库元信息
      • 各Level的SSTable文件列表
      • SSTable缓存
    • Compaction信息、
      • Compaction Pointer
      • 通过Seek触发Compaction信息(文件名、Level);通过Compaction触发Compaction信息(score、level)

    关于版本信息到底有什么用这个话题暂时先放一放,来看具体类。

    VersionSet(维护了一份Version列表,包含当前Alive的所有Version信息,列表中第一个代表数据库的当前版本)

    VersionSet类只有一个实例,在DBImpl(数据库实现类)类中,维护所有活动的Version对象,来看VersionSet的所有语境:

    1. 数据库启动时:通过Current文件加载Manifset文件,读取Manifest文件完成版本信息恢复

      1     Status VersionSet::Recover() {
      2         ......
      3         // Read "CURRENT" file, which contains a pointer to the current manifest file
      4         std::string current;
      5         Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
      6         ......
      7         current.resize(current.size() - 1);
      8 
      9         std::string dscname = dbname_ + "/" + current;
     10         SequentialFile* file;
     11         s = env_->NewSequentialFile(dscname, &file);    //manifest文件
     12         ......
     13 
     14         bool have_log_number = false;
     15         bool have_prev_log_number = false;
     16         bool have_next_file = false;
     17         bool have_last_sequence = false;
     18         uint64_t next_file = 0;
     19         uint64_t last_sequence = 0;
     20         uint64_t log_number = 0;
     21         uint64_t prev_log_number = 0;
     22         VersionSet::Builder builder(this, current_);
     23 
     24         {
     25             LogReporter reporter;
     26             reporter.status = &s;
     27             log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
     28             Slice record;
     29             std::string scratch;
     30             while (reader.ReadRecord(&record, &scratch) && s.ok()) {    //依次读取manifest中的VersionEdit信息,构建VersionSet
     31                 VersionEdit edit;
     32                 s = edit.DecodeFrom(record);
     33                 if (s.ok()) {        //Comparator不一致时,返回错误信息
     34                     if (edit.has_comparator_ &&
     35                         edit.comparator_ != icmp_.user_comparator()->Name()) {
     36                         s = Status::InvalidArgument(
     37                             edit.comparator_ + "does not match existing comparator ",
     38                             icmp_.user_comparator()->Name());
     39                         //实际上,这里可以直接break
     40                     }
     41                 }
     42 
     43                 if (s.ok()) {
     44                     builder.Apply(&edit);    //构建当前Version
     45                 }
     46 
     47                 if (edit.has_log_number_) {
     48                     log_number = edit.log_number_;
     49                     have_log_number = true;
     50                 }
     51 
     52                 if (edit.has_prev_log_number_) {
     53                     prev_log_number = edit.prev_log_number_;
     54                     have_prev_log_number = true;
     55                 }
     56 
     57                 if (edit.has_next_file_number_) {
     58                     next_file = edit.next_file_number_;
     59                     have_next_file = true;
     60                 }
     61 
     62                 if (edit.has_last_sequence_) {
     63                     last_sequence = edit.last_sequence_;
     64                     have_last_sequence = true;
     65                 }
     66             }
     67         }
     68         delete file;
     69         file = NULL;
     70 
     71         ...... 89 
     90         if (s.ok()) {
     91             Version* v = new Version(this);
     92             builder.SaveTo(v);
     93             // Install recovered version
     94             Finalize(v);    //计算下次执行压缩的Level
     95             AppendVersion(v);
     96             manifest_file_number_ = next_file;
     97             next_file_number_ = next_file + 1;
     98             last_sequence_ = last_sequence;
     99             log_number_ = log_number;
    100             prev_log_number_ = prev_log_number;
    101         }
    102 
    103         return s;
    104     }
    105         

    Recover通过Manifest恢复VersionSet及Current Version信息,恢复完毕后Alive的Version列表中仅包含当Current Version对象。

    2. Compaction时:Compaction(压缩)应该是LevelDB中最为复杂的功能,它需要Version类组的深度介入。来看VersionSet中所有和Compaction相关的接口声明:

     1         // Apply *edit to the current version to form a new descriptor that
     2         // is both saved to persistent state and installed as the new
     3         // current version.  Will release *mu while actually writing to the file.
     4         // REQUIRES: *mu is held on entry.
     5         // REQUIRES: no other thread concurrently calls LogAndApply()
     6         Status LogAndApply(VersionEdit* edit, port::Mutex* mu);
     7 
     8         // Pick level and inputs for a new compaction.
     9         // Returns NULL if there is no compaction to be done.
    10         // Otherwise returns a pointer to a heap-allocated object that
    11         // describes the compaction.  Caller should delete the result.
    12         Compaction* PickCompaction();
    13 
    14         // Return a compaction object for compacting the range [begin,end] in
    15         // the specified level.  Returns NULL if there is nothing in that
    16         // level that overlaps the specified range.  Caller should delete
    17         // the result.
    18         Compaction* CompactRange(
    19             int level,
    20             const InternalKey* begin,
    21             const InternalKey* end);
    22 
    23         // Create an iterator that reads over the compaction inputs for "*c".
    24         // The caller should delete the iterator when no longer needed.
    25         Iterator* MakeInputIterator(Compaction* c);
    26 
    27         // Returns true iff some level needs a compaction.
    28         bool NeedsCompaction() const {
    29             Version* v = current_;
    30             return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
    31         }
    32 
    33         // Add all files listed in any live version to *live.
    34         // May also mutate some internal state.
    35         void AddLiveFiles(std::set<uint64_t>* live);

    数据库的读、写操作都可能触发Compaction,通过调用NeedCompaction判定是否需要执行Compaction,如需Compaction则调用PickCompaction获取Compactoin信息。

    其他几个方法也和Compaction操作相关,其中LogAndApply非常重要,它将VersionEdit应用于Current Version、VersoinEdit持久化到Manifest文件、将新的Version做为Current Version。

     1     Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
     2         if (edit->has_log_number_) {
     3             assert(edit->log_number_ >= log_number_);
     4             assert(edit->log_number_ < next_file_number_);
     5         }
     6         else {
     7             edit->SetLogNumber(log_number_);
     8         }
     9 
    10         if (!edit->has_prev_log_number_) {
    11             edit->SetPrevLogNumber(prev_log_number_);
    12         }
    13 
    14         edit->SetNextFile(next_file_number_);
    15         edit->SetLastSequence(last_sequence_);
    16 
    17         //1. New Version = Current Version + VersionEdit
    18         Version* v = new Version(this);    
    19         {
    20             Builder builder(this, current_);
    21             builder.Apply(edit);
    22             builder.SaveTo(v);
    23         }
    24         //2. 重新计算Compaction LevelCompaction Score
    25         Finalize(v);    
    26 
    27         //3. 打开数据库时,创建新的Manifest并保存当前版本信息
    28         // Initialize new descriptor log file if necessary by creating
    29         // a temporary file that contains a snapshot of the current version.
    30         std::string new_manifest_file;
    31         Status s;
    32         if (descriptor_log_ == NULL) {
    33             // No reason to unlock *mu here since we only hit this path in the
    34             // first call to LogAndApply (when opening the database).
    35             assert(descriptor_file_ == NULL);
    36             new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
    37             edit->SetNextFile(next_file_number_);
    38             s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    39             if (s.ok()) {
    40                 descriptor_log_ = new log::Writer(descriptor_file_);
    41                 s = WriteSnapshot(descriptor_log_);    //当前版本信息
    42             }
    43         }
    44 
    45         //4. 保存增量信息,即VersionEdit信息
    46         // Unlock during expensive MANIFEST log write
    47         {
    48             mu->Unlock();
    49 
    50             // Write new record to MANIFEST log
    51             if (s.ok()) {
    52                 std::string record;
    53                 edit->EncodeTo(&record);
    54                 s = descriptor_log_->AddRecord(record);
    55                 if (s.ok()) {
    56                     s = descriptor_file_->Sync();
    57                 }
    58             }
    59 
    60             // If we just created a new descriptor file, install it by writing a
    61             // new CURRENT file that points to it.
    62             if (s.ok() && !new_manifest_file.empty()) {
    63                 s = SetCurrentFile(env_, dbname_, manifest_file_number_);
    64             }
    65 
    66             mu->Lock();
    67         }
    68 
    69         //5. 将新的版本添加到Alive版本列表,并将其做为Current Version
    70         // Install the new version
    71         if (s.ok()) {
    72             AppendVersion(v);
    73             log_number_ = edit->log_number_;
    74             prev_log_number_ = edit->prev_log_number_;
    75         }
    76         else {
    77             delete v;
    78             if (!new_manifest_file.empty()) {
    79                 delete descriptor_log_;
    80                 delete descriptor_file_;
    81                 descriptor_log_ = NULL;
    82                 descriptor_file_ = NULL;
    83                 env_->DeleteFile(new_manifest_file);
    84             }
    85         }
    86 
    87         return s;
    88     }

    3. 读取数据时:LevelDB通过VersionSet中的TableCache对象完成数据读取。

    TableCache是SSTable的缓存类,NewIterator方法通过传入指定的文件编号返回该文件的Iterator供外部使用。

     1     class TableCache {
     2     public:
     3         TableCache(const std::string& dbname, const Options* options, int entries);
     4         ~TableCache();
     5 
     6         // Return an iterator for the specified file number (the corresponding
     7         // file length must be exactly "file_size" bytes).  If "tableptr" is
     8         // non-NULL, also sets "*tableptr" to point to the Table object
     9         // underlying the returned iterator, or NULL if no Table object underlies
    10         // the returned iterator.  The returned "*tableptr" object is owned by
    11         // the cache and should not be deleted, and is valid for as long as the
    12         // returned iterator is live.
    13         Iterator* NewIterator(const ReadOptions& options,
    14             uint64_t file_number,
    15             uint64_t file_size,
    16             Table** tableptr = NULL);
    17 
    18         // Evict any entry for the specified file number
    19         void Evict(uint64_t file_number);
    20 
    21     private:
    22         Env* const env_;
    23         const std::string dbname_;
    24         const Options* options_;
    25         Cache* cache_;
    26     };

    缓存机制主要通过Cache对象实现,关于Cache的备忘下节会讲。

    Version

    Version维护了一份当前版本的SSTable的元数据,其对外暴露的接口大部分也和元数据相关:

     1         void GetOverlappingInputs(
     2             int level,
     3             const InternalKey* begin,         // NULL means before all keys
     4             const InternalKey* end,           // NULL means after all keys
     5             std::vector<FileMetaData*>* inputs);
     6 
     7         // Returns true iff some file in the specified level overlaps
     8         // some part of [*smallest_user_key,*largest_user_key].
     9         // smallest_user_key==NULL represents a key smaller than all keys in the DB.
    10         // largest_user_key==NULL represents a key largest than all keys in the DB.
    11         bool OverlapInLevel(int level,
    12             const Slice* smallest_user_key,
    13             const Slice* largest_user_key);
    14 
    15         // Return the level at which we should place a new memtable compaction
    16         // result that covers the range [smallest_user_key,largest_user_key].
    17         int PickLevelForMemTableOutput(const Slice& smallest_user_key,
    18             const Slice& largest_user_key);
    19 
    20         int NumFiles(int level) const { return files_[level].size(); }

    还有两个数据库读取操作相关的方法Get、UpdateStats,来看Get:

      1     Status Version::Get(const ReadOptions& options,
      2         const LookupKey& k,
      3         std::string* value,
      4         GetStats* stats)
      5     {
      6         Slice ikey = k.internal_key();
      7         Slice user_key = k.user_key();
      8         const Comparator* ucmp = vset_->icmp_.user_comparator();
      9         Status s;
     10 
     11         stats->seek_file = NULL;
     12         stats->seek_file_level = -1;
     13         FileMetaData* last_file_read = NULL;
     14         int last_file_read_level = -1;
     15 
     16         // We can search level-by-level since entries never hop across
     17         // levels.  Therefore we are guaranteed that if we find data
     18         // in an smaller level, later levels are irrelevant.
     19         std::vector<FileMetaData*> tmp;
     20         FileMetaData* tmp2;
     21 
     22         //1. 查找包含指定Key的所有文件
     23         for (int level = 0; level < config::kNumLevels; level++) {
     24             size_t num_files = files_[level].size();
     25             if (num_files == 0) continue;
     26 
     27             // Get the list of files to search in this level
     28             FileMetaData* const* files = &files_[level][0];
     29             if (level == 0) {    //1.1 Level-0可能存在多个文件均包含该Key
     30                 // Level-0 files may overlap each other.  Find all files that
     31                 // overlap user_key and process them in order from newest to oldest.
     32                 tmp.reserve(num_files);
     33                 for (uint32_t i = 0; i < num_files; i++) {
     34                     FileMetaData* f = files[i];
     35                     if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
     36                         ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
     37                         tmp.push_back(f);
     38                     }
     39                 }
     40                 if (tmp.empty()) continue;
     41 
     42                 std::sort(tmp.begin(), tmp.end(), NewestFirst);    //将文件按更新顺序排列
     43                 files = &tmp[0];
     44                 num_files = tmp.size();
     45             }
     46             else {            //1.2 Level-0之上,一个Key只可能存在于一个文件中
     47                 // Binary search to find earliest index whose largest key >= ikey.
     48                 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
     49                 if (index >= num_files) {
     50                     files = NULL;
     51                     num_files = 0;
     52                 }
     53                 else {
     54                     tmp2 = files[index];
     55                     if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
     56                         // All of "tmp2" is past any data for user_key
     57                         files = NULL;
     58                         num_files = 0;
     59                     }
     60                     else {
     61                         files = &tmp2;
     62                         num_files = 1;
     63                     }
     64                 }
     65             }
     66 
     67             //2. 遍历所有文件,查找Key值数据。
     68             for (uint32_t i = 0; i < num_files; ++i) {
     69                 if (last_file_read != NULL && stats->seek_file == NULL) {
     70                     // We have had more than one seek for this read.  Charge the 1st file.
     71                     stats->seek_file = last_file_read;
     72                     stats->seek_file_level = last_file_read_level;
     73                 }
     74 
     75                 FileMetaData* f = files[i];
     76                 last_file_read = f;
     77                 last_file_read_level = level;
     78 
     79                 //2.1 SSTable迭代器
     80                 Iterator* iter = vset_->table_cache_->NewIterator(
     81                     options,
     82                     f->number,
     83                     f->file_size);
     84                 iter->Seek(ikey);    //2.2 查找指定Key
     85                 const bool done = GetValue(iter, user_key, value, &s);    //2.3 Get Value
     86                 if (!iter->status().ok()) {
     87                     s = iter->status();
     88                     delete iter;
     89                     return s;
     90                 }
     91                 else {
     92                     delete iter;
     93                     if (done) {
     94                         return s;
     95                     }
     96                 }
     97             }
     98         }
     99 
    100         return Status::NotFound(Slice());  // Use an empty error message for speed
    101     }

    VersionEdit

    版本建变化除运行期编号修改外,最主要的是SSTable文件的增删信息。当Compaction执行时,必然会出现部分SSTable无效被移除,合并生成的新SSTable被加入到数据库中。VersionEdit提供AddFile、DeleteFile完成变更标识。

    VersionEdit提供的另外一个主要功能接口声明如下:

    1         void EncodeTo(std::string* dst) const;
    2         Status DecodeFrom(const Slice& src);

    这是一组序列化、反序列化方法,序列化文件为Manifest文件。

     1     void VersionEdit::EncodeTo(std::string* dst) const {
     2         //1. 序列化比较器
     3         if (has_comparator_) {
     4             PutVarint32(dst, kComparator);
     5             PutLengthPrefixedSlice(dst, comparator_);
     6         }
     7         //2. 序列化运行期编号信息
     8         if (has_log_number_) {
     9             PutVarint32(dst, kLogNumber);
    10             PutVarint64(dst, log_number_);
    11         }
    12         if (has_prev_log_number_) {
    13             PutVarint32(dst, kPrevLogNumber);
    14             PutVarint64(dst, prev_log_number_);
    15         }
    16         if (has_next_file_number_) {
    17             PutVarint32(dst, kNextFileNumber);
    18             PutVarint64(dst, next_file_number_);
    19         }
    20         if (has_last_sequence_) {
    21             PutVarint32(dst, kLastSequence);
    22             PutVarint64(dst, last_sequence_);
    23         }
    24         //3. 序列化Compact Pointer
    25         for (size_t i = 0; i < compact_pointers_.size(); i++) {
    26             PutVarint32(dst, kCompactPointer);
    27             PutVarint32(dst, compact_pointers_[i].first);  // level
    28             PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
    29         }
    30 
    31         //4. 序列化本次版本变化的SSTable文件列表
    32         for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
    33         iter != deleted_files_.end();
    34             ++iter) {
    35             PutVarint32(dst, kDeletedFile);
    36             PutVarint32(dst, iter->first);   // level
    37             PutVarint64(dst, iter->second);  // file number
    38         }
    39 
    40         for (size_t i = 0; i < new_files_.size(); i++) {
    41             const FileMetaData& f = new_files_[i].second;
    42             PutVarint32(dst, kNewFile);
    43             PutVarint32(dst, new_files_[i].first);  // level
    44             PutVarint64(dst, f.number);
    45             PutVarint64(dst, f.file_size);
    46             PutLengthPrefixedSlice(dst, f.smallest.Encode());
    47             PutLengthPrefixedSlice(dst, f.largest.Encode());
    48         }
    49     }

    回到最开始的问题:版本信息由什么用?

    1. 版本信息记录了运行期一组编号信息,该信息被序列化到Manifest文件中,当数据库再次打开时可恢复至上一次的运行状态。
    2. 版本信息记录了SSTable信息,包括每个文件所属的层级、大小、编号(名称等);Version类组提供了查询SSTable信息功能,如每层文件的列表、数量;同时数据库的Get方法中如需通过文件查找key值数据时,也由Version类组完成。最后,SSTable的缓存机制也有Version类组提供。
    3. 版本信息提供了Compaction支持。

    每个LevelDB有一个Current File,Current File内唯一的信息为:当前数据库的Manifest文件名。Manifest中包含了上次运行后全部的版本信息,LevelDB通过Manifest文件恢复版本信息。

    LevelDB的版本信息为富语义功能组,它所包含的信息已经大大超出了版本定义本身。如果将Version类封装为结构体、VersionSet仅仅为Version列表、VersionEdit也是单纯的结构数据,再为上述结构提供多套功能类应该更为合理。目前来看,这应当算作LevelDB实现的一处臭味。

  • 相关阅读:
    6.html5分组元素
    STL基础--算法(修改数据的算法)
    STL基础--算法(不修改数据的算法)
    STL基础--仿函数(函数对象)
    STL基础--迭代器和算法
    STL基础--容器
    STL基础--基本介绍
    C++11--Tuple类<tuple>
    C++11--随机数引擎和随机数分布<random>
    C++11--时钟和计时器<chrono>
  • 原文地址:https://www.cnblogs.com/desmondwang/p/4824290.html
Copyright © 2011-2022 走看看