zoukankan      html  css  js  c++  java
  • [levelDB] Compaction

    一、原理分析

    前文有述,对于LevelDb来说,写入记录操作很简单,删除记录仅仅写入一个删除标记就算完事,但是读取记录比较复杂,需要在内存以及各个层级文件中依照新鲜程度依次查找,代价很高。为了加快读取速度,levelDb采取了compaction的方式来对已有的记录进行整理压缩,通过这种方式,来删除掉一些不再有效的KV数据,减小数据规模,减少文件数量等。

         levelDb的compaction机制和过程与Bigtable所讲述的是基本一致的,Bigtable中讲到三种类型的compaction: minor ,major和full。所谓minor Compaction,就是把memtable中的数据导出到SSTable文件中;major compaction就是合并不同层级的SSTable文件,而full compaction就是将所有SSTable进行合并。

         LevelDb包含其中两种,minor和major。

        我们将为大家详细叙述其机理。

        先来看看minor Compaction的过程。Minor compaction 的目的是当内存中的memtable大小到了一定值时,将内容保存到磁盘文件中,图8.1是其机理示意图。 

    图8.1 minor compaction

         从8.1可以看出,当memtable数量到了一定程度会转换为immutable memtable,此时不能往其中写入记录,只能从中读取KV内容。之前介绍过,immutable memtable其实是一个多层级队列SkipList,其中的记录是根据key有序排列的。所以这个minor compaction实现起来也很简单,就是按照immutable memtable中记录由小到大遍历,并依次写入一个level 0 的新建SSTable文件中,写完后建立文件的index 数据,这样就完成了一次minor compaction。从图中也可以看出,对于被删除的记录,在minor compaction过程中并不真正删除这个记录,原因也很简单,这里只知道要删掉key记录,但是这个KV数据在哪里?那需要复杂的查找,所以在minor compaction的时候并不做删除,只是将这个key作为一个记录写入文件中,至于真正的删除操作,在以后更高层级的compaction中会去做。

         当某个level下的SSTable文件数目超过一定设置值后,levelDb会从这个level的SSTable中选择一个文件(level>0),将其和高一层级的level+1的SSTable文件合并,这就是major compaction。

        我们知道在大于0的层级中,每个SSTable文件内的Key都是由小到大有序存储的,而且不同文件之间的key范围(文件内最小key和最大key之间)不会有任何重叠。Level 0的SSTable文件有些特殊,尽管每个文件也是根据Key由小到大排列,但是因为level 0的文件是通过minor compaction直接生成的,所以任意两个level 0下的两个sstable文件可能再key范围上有重叠。所以在做major compaction的时候,对于大于level 0的层级,选择其中一个文件就行,但是对于level 0来说,指定某个文件后,本level中很可能有其他SSTable文件的key范围和这个文件有重叠,这种情况下,要找出所有有重叠的文件和level 1的文件进行合并,即level 0在进行文件选择的时候,可能会有多个文件参与major compaction。

      levelDb在选定某个level进行compaction后,还要选择是具体哪个文件要进行compaction,levelDb在这里有个小技巧, 就是说轮流来,比如这次是文件A进行compaction,那么下次就是在key range上紧挨着文件A的文件B进行compaction,这样每个文件都会有机会轮流和高层的level 文件进行合并。

    如果选好了level L的文件A和level L+1层的文件进行合并,那么问题又来了,应该选择level L+1哪些文件进行合并?levelDb选择L+1层中和文件A在key range上有重叠的所有文件来和文件A进行合并。

       也就是说,选定了level L的文件A,之后在level L+1中找到了所有需要合并的文件B,C,D…..等等。剩下的问题就是具体是如何进行major 合并的?就是说给定了一系列文件,每个文件内部是key有序的,如何对这些文件进行合并,使得新生成的文件仍然Key有序,同时抛掉哪些不再有价值的KV 数据。

        图8.2说明了这一过程。

    图8.2 SSTable Compaction

      Major compaction的过程如下:对多个文件采用多路归并排序的方式,依次找出其中最小的Key记录,也就是对多个文件中的所有记录重新进行排序。之后采取一定的标准判断这个Key是否还需要保存,如果判断没有保存价值,那么直接抛掉,如果觉得还需要继续保存,那么就将其写入level L+1层中新生成的一个SSTable文件中。就这样对KV数据一一处理,形成了一系列新的L+1层数据文件,之前的L层文件和L+1层参与compaction 的文件数据此时已经没有意义了,所以全部删除。这样就完成了L层和L+1层文件记录的合并过程。

      那么在major compaction过程中,判断一个KV记录是否抛弃的标准是什么呢?其中一个标准是:对于某个key来说,如果在小于L层中存在这个Key,那么这个KV在major compaction过程中可以抛掉。因为我们前面分析过,对于层级低于L的文件中如果存在同一Key的记录,那么说明对于Key来说,有更新鲜的Value存在,那么过去的Value就等于没有意义了,所以可以删除。

    二、源码分析

    代码流程简述:

    Compaction

    在leveldb中compaction主要包括Manual Compaction和Auto Compaction,在Auto Compaction中又包含了MemTable的Compaction和SSTable的Compaction。

    Manual Compaction

    leveldb中manual compaction是用户指定需要做compaction的key range,调用接口CompactRange来实现,它的主要流程为:

    1. 计算和Range有重合的MaxLevel
    2. 从level 0 到 MaxLevel依次在每层对这个Range做Compaction
    3. 做Compaction时会限制选择做Compaction文件的大小,这样可能每个level的CompactRange可能需要做多次Compaction才能完成
    SSTable Compaction
    1. 启动条件

      • 每个Level的文件大小或文件数超过了这个Level的限制(L0对比文件个数,其它Level对比文件大小。主要是因为L0文件之间可能重叠,文件过多影响读访问,而其它level文件不重叠,限制文件总大小,可以防止一次compaction IO过重)。
      • 含有被寻道次数超过一定阈值的文件(这个是指读请求查找可能去读多个文件,如果最开始读的那个文件未查找到,那么这个文件就被认为寻道一次,当文件的寻道次数达到一定数量时,就认为这个文件应该去做compaction)
      • 条件1的优先级高于条件2
    2. 触发条件

      • 任何改变了上面两个条件的操作,都会触发Compaction,即调用MaybeScheduleCompaction
      • 涉及到第一个条件改变,就是会改变某层文件的文件数目或大小,而只有Compaction操作之后才会改变这个条件
      • 涉及到第二个条件的改变,可能是读操作和scan操作(scan操作是每1M数据采样一次,获得读最后一个key所寻道的文件,1M数据的cost大约为一次寻道)
    3. 文件选取

      • 每个level都会记录上一次Compaction选取的文件所含Key的最大值,作为下次compaction选取文件的起点
      • 对于根据启动条件1所做的Compaction,选取文件就从上次的点开始选取,这样保证每层每个文件都会选取到
      • 对于根据启动条件2所做的Compaction,需要做compaction的文件本身就已经确定了
      • Level + 1层文件的选取,就是和level层选取的文件有重合的文件

      在leveldb中在L层会选取1个文件,理论上这个文件最多覆盖的文件数为12个(leveldb中默认一个文件最大为2M,每层的最大数据量按照10倍增长。这样L层的文件在未对齐的情况下最多覆盖L+1层的12个文件),这样可以控制一次Compaction的最大IO为(1+12)* 2M读IO,总的IO不会超过52M

    MemTable Compaction

    MemTable Compaction最重要的是产出的文件所在层次的选择,它必须满足如下条件: 假设最终选择层次L,那么文件必须和[0, L-1]所有层的文件都没有重合,且对L+1层文件的覆盖不能超过一定的阈值(保证Compaction IO可控)

    Compaction 文件产出
    1. 什么时候切换产出文件

      • 文件大小达到一定的阈值
      • 产出文件对Level+2层有交集的所有文件的大小超过一定阈值
    2. key丢弃的两个条件

      • last_sequence_for_key <= smallest_snapshot (有一个更新的同样的user_key比最小快照要小)
      • key_type == del && key <= smallest_snapshot && IsBaseLevelForKey(key的类型是删除,且这个key的版本比最小快照要小,并且在更高Level没有同样的user_key)

    了解了compaction的一些原理和机制以后我们该回到代码来看看具体的代码流程是怎么样的,首先回到DBimpl中的MakeRoomForWrite

     1 Status DBImpl::MakeRoomForWrite(bool force) {
     2   bool allow_delay = !force;
     3   Status s;
     4   while (true) {
     5     if (!bg_error_.ok()) {
     6       // Yield previous error
     7       s = bg_error_;
     8       break;
     9     } else if (
    10         allow_delay &&
    11         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
    12           // 当L0的文件数量要达到阈值的时候,我们每次写入都延迟1ms,
    13            // 这样可以为后台的compaction腾出一定的cpu(当后台compaction
    14          //和当前线程是使用的一个内核的时候)这样可以降低写入延迟的方差
    15           //因为延迟被分摊到多个写上面,而不是在几个甚至一个写的时候
    16       env_->SleepForMicroseconds(1000);
    17       allow_delay = false; // 每次写只允许延迟一次
    18     } else if (!force &&  //当前mmetable的占用量未达到阈值
    19                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
    20       break;
    21     } else if (imm_ != NULL) {
    22       // 上一次memtable的compaction尚未结束,等待后台compaction完成
    23        // 因为compaction的过程为 mem ->imm 完成后删除imm
    24       bg_cv_.Wait();
    25     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
    26       // level 0的文件数量超过阈值,等待后台compaction完成
    27       bg_cv_.Wait();
    28     } else {
    29       // memtable达到阈值,新生成日志和memtable,并将原先的mem转化为imm给后台compact
    30       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
    31     
    32       delete log_;
    33       delete logfile_;
    34       logfile_ = lfile;
    35       logfile_number_ = new_log_number;
    36       log_ = new log::Writer(lfile);
    37       imm_ = mem_;
    38       has_imm_.Release_Store(imm_);
    39       mem_ = new MemTable(internal_comparator_);
    40       mem_->Ref();
    41       force = false;             // Do not force another compaction if have room
    42       MaybeScheduleCompaction(); //触发后台compaction
    43     }
    44   }
    45   return s;
    46 }

    MaybeScheduleCompaction函数只是简单判断后台线程是否已经启动和一些其他的错误判断,如果未启动则启动后台compaction线程。这个compaction线程的实现在DBImpl::BackgroundCall,这个函数也只是简单的调用实现了compaction实际逻辑的函数BackgroundCompaction,我们这里就来仔细分析一下这个函数

    void DBImpl::BackgroundCompaction() {
      if (imm_ != NULL) { //有转化的memtable,直接将MemTable写入SSTable即返回
        CompactMemTable();
        return;
      }
      if (is_manual) { //用户主动(手动)触发的compaction
        ManualCompaction* m = manual_compaction_;
       //取得进项compact的输入文件生成compaction类
        c = versions_->CompactRange(m->level, m->begin, m->end);
        m->done = (c == NULL);
        if (c != NULL) {
         //取得level中最大的一个key
          manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
        }
      } else {
        c = versions_->PickCompaction();
      }
      if (c == NULL) {
      } else if (!is_manual && c->IsTrivialMove()) {
      //如果不是主动触发的,并且level中的输入文件与level+1中无重叠,且与level + 2中重叠不大于
      //kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接将文件移到level+1中
        c->edit()->DeleteFile(c->level(), f->number);
        c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                           f->smallest, f->largest);
        status = versions_->LogAndApply(c->edit(), &mutex_);    //写入version中,稍后分析
      } else {//否则调用DoCompactionWork进行Compact输入文件
        CompactionState* compact = new CompactionState(c);
        status = DoCompactionWork(compact);
        CleanupCompaction(compact);      //清理compact过程中的临时变量
        c->ReleaseInputs();               //清除输入文件描述符
        DeleteObsoleteFiles();            //删除无引用的文件
      }
      delete c;
      if (is_manual) {
        ManualCompaction* m = manual_compaction_;
        if (!status.ok()) {//如果compaction出错,也将手动的compaction标记为done
          m->done = true;
        }
        if (!m->done) {//如果没有完成也仅仅记录基本状态,感觉manual的形式未实现完整逻辑
          m->tmp_storage = manual_end;
          m->begin = &m->tmp_storage;
        }
        manual_compaction_ = NULL;
      }
    }

    leveldb满足其内部一些阈值条件后触发的compaction是如何选择输入文件的呢?这个逻辑在中,下面我们来仔细的分析一下

    Compaction* VersionSet::PickCompaction() {
     //每次compact完成在VersionSet::Finalize中计算每个level中TotalFileSize / MaxBytesForLevel
     // 的值,并且将最大的值最为compaction_score_ ,和compaction_level_
      const bool size_compaction = (current_->compaction_score_ >= 1);  
      //对于每个SSTable会有一个 允许seek的次数 (f->file_size / 16384)超过这么多次会将其设置为
      const bool seek_compaction = (current_->file_to_compact_ != NULL);
      // 这两种可能导致的compaction中,我们优先compact第一种情况的
      if (size_compaction) {
        level = current_->compaction_level_;
        c = new Compaction(level);
        // 查找第一个包含比上次已经compact的最大key大的key的文件
        for (size_t i = 0; i < current_->files_[level].size(); i++) {
          if (compact_pointer_[level].empty() ||
              icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
            c->inputs_[0].push_back(f);
            break;
          }
        }
        if (c->inputs_[0].empty()) {
          // 如果上次已经是最大的key,那么回到第一个文件开始compact
          c->inputs_[0].push_back(current_->files_[level][0]);
        }
      } else if (seek_compaction) {//如果是查找导致的,直接将导致compact的文件加入inputs_[0]
        level = current_->file_to_compact_level_;
        c = new Compaction(level);
        c->inputs_[0].push_back(current_->file_to_compact_);
      } else {
        return NULL;
      }
      c->input_version_ = current_;
      c->input_version_->Ref();
      // 如果是level 0 则还需查找level 0中其他和输入文件重叠的文件
      if (level == 0) {
        GetRange(c->inputs_[0], &smallest, &largest);
        current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
      }
      SetupOtherInputs(c); //尝试加入level中新的文件,条件为不再与level+1中新的文件重叠,这个函数已经分析
      return c;
    }

    选择好了需要进行Compaction的的文件以后,就该调用实际的Compaction过程了,我们来分析其逻辑,过程比较长但是只要仔细细心的阅读,其处理的逻辑并不复杂,主要是遍历所有输入文件,然后将相同的可以进行合并,以及删除一些无用的delete操作等。

      1 Status DBImpl::DoCompactionWork(CompactionState* compact) {
      2     //将snapshot相关的内容记录到compact信息中
      3   if (snapshots_.empty()) {
      4     compact->smallest_snapshot = versions_->LastSequence();
      5   } else {
      6     compact->smallest_snapshot = snapshots_.oldest()->number_;
      7   }
      8   //遍历所有inputs文件
      9   Iterator* input = versions_->MakeInputIterator(compact->compaction);
     10   for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
     11     // 每次都判断如果有memtable 需要compact,先compact memtable
     12     if (has_imm_.NoBarrier_Load() != NULL) {
     13       if (imm_ != NULL) { 
     14         CompactMemTable();
     15         bg_cv_.SignalAll(); // Wakeup 等待空间的线程
     16       }
     17     }
     18     Slice key = input->key();
     19     if (compact->compaction->ShouldStopBefore(key) &&
     20         compact->builder != NULL) { //当前(level +1)生成的文件和level + 2中有过多的重叠
     21       status = FinishCompactionOutputFile(compact, input); //写当前文件到磁盘
     22       if (!status.ok()) {
     23         break;
     24       }
     25     }
     26     // Handle key/value, add to state, etc.
     27     bool drop = false;
     28     if (!ParseInternalKey(key, &ikey)) {
     29       // 解码错误,清除之前的状态
     30       current_user_key.clear();
     31       has_current_user_key = false;
     32       last_sequence_for_key = kMaxSequenceNumber;
     33     } else {
     34       if (!has_current_user_key ||
     35           user_comparator()->Compare(ikey.user_key,
     36                                      Slice(current_user_key)) != 0) {
     37         // 第一次出现的key,将seq设置为最大标记新key开始
     38         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
     39         has_current_user_key = true;
     40         last_sequence_for_key = kMaxSequenceNumber;
     41       }
     42         //因为第一次出现会将last seq设置为最大,表示上一个key的关于seq的比较结束
     43       if (last_sequence_for_key <= compact->smallest_snapshot) {
     44         // Hidden by an newer entry for same user key
     45         drop = true; // (A)
     46       } else if (ikey.type == kTypeDeletion &&  
     47                  ikey.sequence <= compact->smallest_snapshot &&               //无snapshot引用
     48                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //(1)
     49         // For this user key:
     50         // (1) there is no data in higher levels
     51         // 而我们知道在底层的文件中seq会更大,正在被compact的相同的key会稍后标记这个为删除(ruleA)
     52         drop = true;
     53       }
     54       last_sequence_for_key = ikey.sequence; 
     55     }
     56 
     57     if (!drop) {
     58       // 第一次进入compact或者上次文件刚刚写到磁盘,新建一个文件和table_builder
     59       if (compact->builder == NULL) {
     60         status = OpenCompactionOutputFile(compact);
     61         if (!status.ok()) {
     62           break;
     63         }
     64       }
     65     //新文件,记录当前key 为 整个文件的smallest
     66       if (compact->builder->NumEntries() == 0) {
     67         compact->current_output()->smallest.DecodeFrom(key);
     68       }
     69       //每遍历到一个就将其记录为largest
     70       compact->current_output()->largest.DecodeFrom(key);
     71       compact->builder->Add(key, input->value());
     72       // 超过level的阈值大小,将文件写到磁盘
     73       if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) {
     74         status = FinishCompactionOutputFile(compact, input);
     75         if (!status.ok()) {
     76           break;
     77         }
     78       }
     79     }
     80     input->Next();
     81   }
     82 //判断状态和将未写到磁盘的数据写入磁盘
     83   if (status.ok() && shutting_down_.Acquire_Load()) {
     84     status = Status::IOError("Deleting DB during compaction");
     85   }
     86   if (status.ok() && compact->builder != NULL) {
     87     status = FinishCompactionOutputFile(compact, input);
     88   }
     89   if (status.ok()) {
     90     status = input->status();
     91   }
     92   delete input;
     93   input = NULL;
     94   CompactionStats stats;
     95   stats.micros = env_->NowMicros() - start_micros - imm_micros;
     96   for (int which = 0; which < 2; which++) {//计算本次Compaction读入文件的总大小
     97     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
     98       stats.bytes_read += compact->compaction->input(which, i)->file_size;
     99     }
    100   }
    101   for (size_t i = 0; i < compact->outputs.size(); i++) {
    102     stats.bytes_written += compact->outputs[i].file_size;
    103   }//本次Compaction写出文件的总大小
    104   mutex_.Lock();
    105   stats_[compact->compaction->level() + 1].Add(stats);
    106   if (status.ok()) {//记录统计信息以及将Compaction导致的文件变动记录到versionedit中
    107     status = InstallCompactionResults(compact);
    108   }
    109   return status;
    110 }

    SSTable的Compaction就分析完了,关于Compaction还剩下MemTable的Compaction,或者也可以将其说明为Memtable的dump为SSTable。再分析完上面的SSTable Compaction后你就发现MemTable的Compaction是如此之简单了,我们简单罗列一下

     1 void DBImpl::CompactMemTable() {
     2   Status s = WriteLevel0Table(imm_, &edit, base);
     3   // Replace immutable memtable with the generated Table
     4   if (s.ok()) {
     5     edit.SetPrevLogNumber(0);
     6     edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
     7     s = versions_->LogAndApply(&edit, &mutex_);
     8   }
     9   if (s.ok()) {
    10     // Commit to the new state
    11     imm_->Unref();
    12     imm_ = NULL;
    13     has_imm_.Release_Store(NULL);
    14     DeleteObsoleteFiles();
    15   } else {
    16     RecordBackgroundError(s);
    17   }
    18 }

    这个逻辑中就一个主要的函数WriteLevel0Table,其流程如下:

     1 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) { 
     2   meta.number = versions_->NewFileNumber();
     3   pending_outputs_.insert(meta.number);
     4   Iterator* iter = mem->NewIterator();
     5   //新生成一个Table_builder负责写文件
     6     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
     7 
     8   // Note that if file_size is zero, the file has been deleted and 
     9   // should not be added to the manifest.
    10   int level = 0;
    11   if (s.ok() && meta.file_size > 0) {
    12     const Slice min_user_key = meta.smallest.user_key();
    13     const Slice max_user_key = meta.largest.user_key();
    14     if (base != NULL) {
    15       /* 找到一个当层未overlap 且上册overlap 不会过多(kMaxGrandParentOverlapBytes)的层返回*/ 
    16       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    17     }
    18     //将文件加到versionedit中
    19     edit->AddFile(level, meta.number, meta.file_size,
    20                   meta.smallest, meta.largest);
    21   }
    22   CompactionStats stats;
    23   stats.micros = env_->NowMicros() - start_micros;
    24   stats.bytes_written = meta.file_size;
    25   stats_[level].Add(stats);
    26   return s;
    27 }

    这里有一个唯一需要注意的是——将Memtable dump到磁盘以后并不是如文档描述的“将新的SSTable加到level 0中.”,而是会用一个函数PickLevelForMemTableOutput选择一个最高的可以将这个SSTable放入的level中。一般来说会是level 0,但是还是存在一些特殊情况可以将其放到更高的level中,这样可以降低Compaction的频率。PickLevelForMemTableOutput的逻辑简单,请读者自行阅读。

    至此comaction流程相关的函数就分析完了,本节内容比较多,但是只要静下心来慢慢品读理解还是不难的。至此leveldb中剩下的还有recover,new (新建一个数据库)、snapshot、get相关的代码没有分析了。我们在compaction的分析过程中涉及到了很多有关version的类、方法、结构,leveldb的vesion是整个系统极其重要的一环,而且recovery,snapshot,get在一定程度上都会依赖于version的实现,所以接下来的文章准备对version相关的内容进行介绍。

  • 相关阅读:
    bzoj4537: [Hnoi2016]最小公倍数
    bzoj4331: JSOI2012 越狱老虎桥
    bzoj4558: [JLoi2016]方
    bzoj4209: 西瓜王
    bzoj2653: middle
    bzoj4671: 异或图
    bzoj4771: 七彩树
    shell java应用启动脚本(app.sh)
    Springboot 构建http服务,返回的http行是'HTTP/1.1 200' 无状态码描述 客户端解析错误
    MariaDB(Mysql)-主从搭建
  • 原文地址:https://www.cnblogs.com/ym65536/p/10995048.html
Copyright © 2011-2022 走看看