zoukankan      html  css  js  c++  java
  • LevelDB场景分析4--BackgroundCompaction

    1.DBImpl::Open

     1 Status DB::Open(const Options& options, const std::string& dbname,
     2                 DB** dbptr) {
     3   *dbptr = NULL;
     4 
     5   DBImpl* impl = new DBImpl(options, dbname);
     6   impl->mutex_.Lock();
     7   VersionEdit edit;
     8   Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
     9   if (s.ok()) {
    10     uint64_t new_log_number = impl->versions_->NewFileNumber();
    11     WritableFile* lfile;
    12     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
    13                                      &lfile);
    14     if (s.ok()) {
    15       edit.SetLogNumber(new_log_number);
    16       impl->logfile_ = lfile;
    17       impl->logfile_number_ = new_log_number;
    18       impl->log_ = new log::Writer(lfile);
    19       s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
    20     }
    21     if (s.ok()) {
    22       impl->DeleteObsoleteFiles();
    23       impl->MaybeScheduleCompaction();
    24     }
    25   }
    26   impl->mutex_.Unlock();
    27   if (s.ok()) {
    28     *dbptr = impl;
    29   } else {
    30     delete impl;
    31   }
    32   return s;
    33 }

    2.DBImpl::Get

     1 Status DBImpl::Get(const ReadOptions& options,
     2                    const Slice& key,
     3                    std::string* value) {
     4   Status s;
     5   MutexLock l(&mutex_);
     6   SequenceNumber snapshot;
     7   if (options.snapshot != NULL) {
     8     snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
     9   } else {
    10     snapshot = versions_->LastSequence();
    11   }
    12 
    13   MemTable* mem = mem_;
    14   MemTable* imm = imm_;
    15   Version* current = versions_->current();
    16   mem->Ref();
    17   if (imm != NULL) imm->Ref();
    18   current->Ref();
    19 
    20   bool have_stat_update = false;
    21   Version::GetStats stats;
    22 
    23   // Unlock while reading from files and memtables
    24   {
    25     mutex_.Unlock();
    26     // First look in the memtable, then in the immutable memtable (if any).
    27     LookupKey lkey(key, snapshot);
    28     if (mem->Get(lkey, value, &s)) {
    29       // Done
    30     } else if (imm != NULL && imm->Get(lkey, value, &s)) {
    31       // Done
    32     } else {
    33       s = current->Get(options, lkey, value, &stats);
    34       have_stat_update = true;
    35     }
    36     mutex_.Lock();
    37   }
    38 
    39   if (have_stat_update && current->UpdateStats(stats)) {
    40     MaybeScheduleCompaction();
    41   }
    42   mem->Unref();
    43   if (imm != NULL) imm->Unref();
    44   current->Unref();
    45   return s;
    46 }

     

    3.DBImpl::RecordReadSample

    1 void DBImpl::RecordReadSample(Slice key) {
    2   MutexLock l(&mutex_);
    3   if (versions_->current()->RecordReadSample(key)) {
    4     MaybeScheduleCompaction();
    5   }
    6 }

    4.DBImpl::MakeRoomForWrite

     1 Status DBImpl::MakeRoomForWrite(bool force) {
     2   mutex_.AssertHeld();
     3   assert(!writers_.empty());
     4   bool allow_delay = !force;
     5   Status s;
     6   while (true) {
     7     if (!bg_error_.ok()) {
     8       // Yield previous error
     9       s = bg_error_;
    10       break;
    11     } else if (
    12         allow_delay &&
    13         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
    14       // We are getting close to hitting a hard limit on the number of
    15       // L0 files.  Rather than delaying a single write by several
    16       // seconds when we hit the hard limit, start delaying each
    17       // individual write by 1ms to reduce latency variance.  Also,
    18       // this delay hands over some CPU to the compaction thread in
    19       // case it is sharing the same core as the writer.
    20       mutex_.Unlock();
    21       env_->SleepForMicroseconds(1000);
    22       allow_delay = false;  // Do not delay a single write more than once
    23       mutex_.Lock();
    24     } else if (!force &&
    25                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
    26       // There is room in current memtable
    27       break;
    28     } else if (imm_ != NULL) {
    29       // We have filled up the current memtable, but the previous
    30       // one is still being compacted, so we wait.
    31       Log(options_.info_log, "Current memtable full; waiting... ");
    32       bg_cv_.Wait();
    33     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
    34       // There are too many level-0 files.
    35       Log(options_.info_log, "Too many L0 files; waiting... ");
    36       bg_cv_.Wait();
    37     } else {
    38       // Attempt to switch to a new memtable and trigger compaction of old
    39       assert(versions_->PrevLogNumber() == 0);
    40       uint64_t new_log_number = versions_->NewFileNumber();
    41       WritableFile* lfile = NULL;
    42       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
    43       if (!s.ok()) {
    44         // Avoid chewing through file number space in a tight loop.
    45         versions_->ReuseFileNumber(new_log_number);
    46         break;
    47       }
    48       delete log_;
    49       delete logfile_;
    50       logfile_ = lfile;
    51       logfile_number_ = new_log_number;
    52       log_ = new log::Writer(lfile);
    53       imm_ = mem_;
    54       has_imm_.Release_Store(imm_);
    55       mem_ = new MemTable(internal_comparator_);
    56       mem_->Ref();
    57       force = false;   // Do not force another compaction if have room
    58       MaybeScheduleCompaction();
    59     }
    60   }
    61   return s;
    62 }

    1-4均会调用MaybeScheduleCompaction()从而调用BackgroundCompaction来完成compact。

    以下是核心Compact的过程 

     BackgroundCompaction

     1 void DBImpl::BackgroundCompaction() {
     2   mutex_.AssertHeld();
     3 
     4   if (imm_ != NULL) {
     5     CompactMemTable();
     6     return;
     7   }
     8 
     9   Compaction* c;
    10   bool is_manual = (manual_compaction_ != NULL); // 正常情况下为false,因为初始化时为空
    11   InternalKey manual_end;
    12   if (is_manual) {
    13     ManualCompaction* m = manual_compaction_;
    14     c = versions_->CompactRange(m->level, m->begin, m->end);
    15     m->done = (c == NULL);
    16     if (c != NULL) {
    17       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    18     }
    19     Log(options_.info_log,
    20         "Manual compaction at level-%d from %s .. %s; will stop at %s ",
    21         m->level,
    22         (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
    23         (m->end ? m->end->DebugString().c_str() : "(end)"),
    24         (m->done ? "(end)" : manual_end.DebugString().c_str()));
    25   } else {
    26     c = versions_->PickCompaction(); // 找出应该合并的 level 及 level + 1层的FileMetaData*
    27   }
    28 
    29   Status status;
    30   if (c == NULL) {
    31     // Nothing to do
    32   } else if (!is_manual && c->IsTrivialMove()) {
    33     // Move file to next level
    34     assert(c->num_input_files(0) == 1);
    35     FileMetaData* f = c->input(00);
    36     c->edit()->DeleteFile(c->level(), f->number);
    37     c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
    38                        f->smallest, f->largest);
    39     status = versions_->LogAndApply(c->edit(), &mutex_);
    40     if (!status.ok()) {
    41       RecordBackgroundError(status);
    42     }
    43     VersionSet::LevelSummaryStorage tmp;
    44     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s ",
    45         static_cast<unsigned long long>(f->number),
    46         c->level() + 1,
    47         static_cast<unsigned long long>(f->file_size),
    48         status.ToString().c_str(),
    49         versions_->LevelSummary(&tmp));
    50   } else {
    51     CompactionState* compact = new CompactionState(c);
    52     status = DoCompactionWork(compact); // 核心Compact
    53     if (!status.ok()) {
    54       RecordBackgroundError(status);
    55     }
    56     CleanupCompaction(compact);
    57     c->ReleaseInputs();
    58     DeleteObsoleteFiles();
    59   }
    60   delete c;
    61 
    62   if (status.ok()) {
    63     // Done
    64   } else if (shutting_down_.Acquire_Load()) {
    65     // Ignore compaction errors found during shutting down
    66   } else {
    67     Log(options_.info_log,
    68         "Compaction error: %s", status.ToString().c_str());
    69   }
    70 
    71   if (is_manual) {
    72     ManualCompaction* m = manual_compaction_;
    73     if (!status.ok()) {
    74       m->done = true;
    75     }
    76     if (!m->done) {
    77       // We only compacted part of the requested range.  Update *m
    78       // to the range that is left to be compacted.
    79       m->tmp_storage = manual_end;
    80       m->begin = &m->tmp_storage;
    81     }
    82     manual_compaction_ = NULL;
    83   }
    84 }
  • 相关阅读:
    ubuntu安装gradle
    ubuntu文件查找
    接口服务flask的负载均衡部署
    区块链节点网络的nginx转发部署
    typescript检查包之间是否有循环依赖
    shiro对事务的影响
    防止xss(脚本攻击)的方法之过滤器
    Mysql批量更新速度慢的解决方案
    springBoot的三种启动方式
    mybatis的注解开发之三种动态sql
  • 原文地址:https://www.cnblogs.com/onlyforcloud/p/4494608.html
Copyright © 2011-2022 走看看