zoukankan      html  css  js  c++  java
  • LevelDB的源码阅读(二) Open操作

    Linux上leveldb的安装和使用中我们写了一个测试代码,内容如下:

    #include "leveldb/db.h"
    #include <cassert>
    #include <iostream>
    
    using namespace std;
    using namespace leveldb;
    
    int main() {
        leveldb::DB *db;
        leveldb::Options options;
        options.create_if_missing = true;
        leveldb::Status status = leveldb::DB::Open(options, "testdb", &db);
        assert(status.ok());
    
        status = db->Put(WriteOptions(), "test", "Hello World!");
        assert(status.ok());
        string res;
        status = db->Get(ReadOptions(), "test", &res);
        assert(status.ok());
        cout << res << endl;
    
        delete db;
        return 0;
    }

    其中db.h中定义了leveldb对外接口,定义了class DB,这个类只是一个接口类,leveldb::DB::Open操作来自leveldb源代码db文件夹下db_impl.cc文件,源码内容如下:

    Status DB::Open(const Options& options, const std::string& dbname,
                    DB** dbptr) {
      *dbptr = NULL;
    
      DBImpl* impl = new DBImpl(options, dbname);
      impl->mutex_.Lock();
      VersionEdit edit;
      // Recover handles create_if_missing, error_if_exists
      bool save_manifest = false;
      Status s = impl->Recover(&edit, &save_manifest);
      if (s.ok() && impl->mem_ == NULL) {
        // Create new log and a corresponding memtable.
        uint64_t new_log_number = impl->versions_->NewFileNumber();
        WritableFile* lfile;
        s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                         &lfile);
        if (s.ok()) {
          edit.SetLogNumber(new_log_number);
          impl->logfile_ = lfile;
          impl->logfile_number_ = new_log_number;
          impl->log_ = new log::Writer(lfile);
          impl->mem_ = new MemTable(impl->internal_comparator_);
          impl->mem_->Ref();
        }
      }
      if (s.ok() && save_manifest) {
        edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
        edit.SetLogNumber(impl->logfile_number_);
        s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
      }
      if (s.ok()) {
        impl->DeleteObsoleteFiles();
        impl->MaybeScheduleCompaction();
      }
      impl->mutex_.Unlock();
      if (s.ok()) {
        assert(impl->mem_ != NULL);
        *dbptr = impl;
      } else {
        delete impl;
      }
      return s;
    }

    DB::Open函数创建的是一个DBImpl类,具体的操作由DBImpl类来处理.DBImpl构造函数如下:

    DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
        : env_(raw_options.env), // Env* const
          internal_comparator_(raw_options.comparator), // const InternalKeyComparator
          internal_filter_policy_(raw_options.filter_policy), // const InternalFilterPolicy
          options_(SanitizeOptions(dbname, &internal_comparator_, // const Options
                                   &internal_filter_policy_, raw_options)),
          owns_info_log_(options_.info_log != raw_options.info_log), // bool
          owns_cache_(options_.block_cache != raw_options.block_cache), // bool
          dbname_(dbname), // const std::string
          db_lock_(NULL), // FileLock*
          shutting_down_(NULL), // port::AtomicPointer
          bg_cv_(&mutex_), // port::CondVar
          mem_(NULL), // MemTable*
          imm_(NULL), // MemTable*
          logfile_(NULL), // WritableFile*
          logfile_number_(0), // uint64_t
          log_(NULL), // log::Writer*
          seed_(0), // uint32_t
          tmp_batch_(new WriteBatch), // WriteBatch*
          bg_compaction_scheduled_(false), // bool
          manual_compaction_(NULL) { // ManualCompaction*
      has_imm_.Release_Store(NULL);
    
      // Reserve ten files or so for other uses and give the rest to TableCache.
      const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
      table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
    
      versions_ = new VersionSet(dbname_, &options_, table_cache_,
                                 &internal_comparator_);
    }

    以下是成员变量的含义:

    • env_, 负责所有IO, 比如建立文件
    • internal_comparator_, 用来比较不同key的大小
    • internal_filter_policy_, 可自定义BloomFilter
    • options_, 对调用者传入的options进行调整
    • db_lock_, 文件锁
    • shutting_down_, 基于memory barrier的原子指针
    • bg_cv_, 多线程的条件
    • mem_ = memtable
    • imm = immemtable
    • tmp_batch_, 所有Put都是以batch写入, 这里建立临时的batch
    • manual_compaction_, 内部开发者调用时的参数, 可以不用理会
    • has_imm_, 用于判断是否有等待或者正在写入硬盘的immemtable
    • table_cache_, SSTable查询缓存
    • versions_, 数据库MVCC

    接下来说一下memory barrier的问题.

    程序经由编译器处理后变成了一条条的机器指令,每条指令又对应了更基础的几个硬件阶段,各个指令在这些硬件阶段里排队处理,组成流水线.由于不同指令对应的硬件阶段不同,导致有些时候流水线填不满,影响性能.因此硬件会主动的对指令顺序做微调,提升流水线的效率,也就是乱序执行.乱序执行是有规则的,在单线程下几乎不需要软件介入,但多线程就不一样了.比如说我们的程序这么写:

    thread1:
    1: some_task = 123;
    2: complete = true;
    
    thread2:
      while (!complete) sleep(1);
      printf("got %d
    ", task_result);

    程序本意是等任务完成后输出 got 123,但结果很可能是 got 0,原因就是 1 和 2 两条指令被调整了顺序,乱序不当.为解决这样的问题,cpu 增加了用于控制乱序执行的指令,称为内存栅栏(memory barrier).在 1、2 之间插入栅栏就会强制 cpu 不做乱序,从而保证程序的正确性.

    在leveldb中port文件夹下atomic_pointer.h里出现了memory barrier.

    首先是memory barrier的定义问题,leveldb提供了不同环境下的多种定义方式:

    // Define MemoryBarrier() if available
    // Windows on x86
    #if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY)
    // windows.h already provides a MemoryBarrier(void) macro
    // http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // Mac OS
    #elif defined(__APPLE__)
    inline void MemoryBarrier() {
      OSMemoryBarrier();
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // Gcc on x86
    #elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
    inline void MemoryBarrier() {
      // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
      // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
      __asm__ __volatile__("" : : : "memory");
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // Sun Studio
    #elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
    inline void MemoryBarrier() {
      // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
      // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
      asm volatile("" : : : "memory");
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // ARM Linux
    #elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__)
    typedef void (*LinuxKernelMemoryBarrierFunc)(void);
    // The Linux ARM kernel provides a highly optimized device-specific memory
    // barrier function at a fixed memory address that is mapped in every
    // user-level process.
    //
    // This beats using CPU-specific instructions which are, on single-core
    // devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more
    // than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking
    // shows that the extra function call cost is completely negligible on
    // multi-core devices.
    //
    inline void MemoryBarrier() {
      (*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)();
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // ARM64
    #elif defined(ARCH_CPU_ARM64_FAMILY)
    inline void MemoryBarrier() {
      asm volatile("dmb sy" : : : "memory");
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // PPC
    #elif defined(ARCH_CPU_PPC_FAMILY) && defined(__GNUC__)
    inline void MemoryBarrier() {
      // TODO for some powerpc expert: is there a cheaper suitable variant?
      // Perhaps by having separate barriers for acquire and release ops.
      asm volatile("sync" : : : "memory");
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    // MIPS
    #elif defined(ARCH_CPU_MIPS_FAMILY) && defined(__GNUC__)
    inline void MemoryBarrier() {
      __asm__ __volatile__("sync" : : : "memory");
    }
    #define LEVELDB_HAVE_MEMORY_BARRIER
    
    #endif

     例如,对于X86内存模型而言,__asm__ __volatile__(" : : : "memory")的意思就是告诉编译器,memory在执行这端inline 汇编代码之后,已经失效了.也就是告诉编译器,不要在这个地方优化有关访问内存的指令.

    在atomic_pointer.h中 memory barrier的使用方式如下:

    // AtomicPointer built using platform-specific MemoryBarrier()
    #if defined(LEVELDB_HAVE_MEMORY_BARRIER)
    class AtomicPointer {
     private:
      void* rep_;
     public:
      AtomicPointer() { }
      explicit AtomicPointer(void* p) : rep_(p) {}
      inline void* NoBarrier_Load() const { return rep_; }
      inline void NoBarrier_Store(void* v) { rep_ = v; }
      inline void* Acquire_Load() const {
        void* result = rep_;
        MemoryBarrier();
        return result;
      }
      inline void Release_Store(void* v) {
        MemoryBarrier();
        rep_ = v;
      }
    };

    DBImpl中的has_imm_就是上面描述的atomic pointer.

    再次回到db_impl.cc文件,读完:DB::Open的源码:

    Status DB::Open(const Options& options, const std::string& dbname,
                    DB** dbptr) { 
      *dbptr = NULL; // 设置结果默认值, 指针传值
      DBImpl* impl = new DBImpl(options, dbname);
      impl->mutex_.Lock(); // 数据恢复时上锁, 禁止所有可能的后台任务
      VersionEdit edit;
      // Recover handles create_if_missing, error_if_exists
      bool save_manifest = false;
      Status s = impl->Recover(&edit, &save_manifest); // //调用DBImpl的恢复数据接口,读取元数据,恢复日志数据
      if (s.ok() && impl->mem_ == NULL) {
        // Create new log and a corresponding memtable. 复位
        uint64_t new_log_number = impl->versions_->NewFileNumber();
        WritableFile* lfile;
        //创建新的写操作日志文件
        s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                         &lfile);
        if (s.ok()) {
          edit.SetLogNumber(new_log_number);
          impl->logfile_ = lfile;
          impl->logfile_number_ = new_log_number;
          impl->log_ = new log::Writer(lfile);
          impl->mem_ = new MemTable(impl->internal_comparator_);
          impl->mem_->Ref();
        }
      }
      if (s.ok() && save_manifest) {
        edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
        edit.SetLogNumber(impl->logfile_number_);
        s = impl->versions_->LogAndApply(&edit, &impl->mutex_); //添加VersionEdit,初始化时会将现在的VersionSet的状态写入新的manifest文件,并更新Current文件
      }
      if (s.ok()) {
        impl->DeleteObsoleteFiles(); // 清理无用文件
        impl->MaybeScheduleCompaction(); // 有写入就有可能要compact
      }
      impl->mutex_.Unlock(); // 初始化完毕
      if (s.ok()) {
        assert(impl->mem_ != NULL);
        *dbptr = impl;
      } else {
        delete impl;
      }
      return s;
    }

    参考文献:

    1.https://zhuanlan.zhihu.com/jimderestaurant?topic=LevelDB

    2.https://www.zhihu.com/question/24301047

    3.https://www.zhihu.com/question/49039919

    4.http://blog.csdn.net/joeyon1985/article/details/47154249

  • 相关阅读:
    abstract关键字
    final关键字
    Vue使用枚举类型实现HTML下拉框
    第八节 pandas读取和保存文件
    第七节 pandas新建数据框的两种方式
    第六节 numpy的常用属性和方法
    第五节 numpy的简单使用
    第三节 matplotlib绘制直方图
    第三节 matplotlib绘制条形图
    第二节 matplotlib绘制散点图
  • 原文地址:https://www.cnblogs.com/xueqiuqiu/p/8289046.html
Copyright © 2011-2022 走看看