zoukankan      html  css  js  c++  java
  • Rocksdb iterator和snapshot 接口

    Rocksdb提供迭代器来来访问整个db中的数据,就像STL中的迭代器功能一样,用来访问容器中的具体的数据。

    访问形式以及访问接口有如下几种:

    • 遍历所有的key-value
      //打开db,并初始化一个迭代器指针
      rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
      for (it->SeekToFirst(); it->Valid(); it->Next()) {
          cout << it->key().ToString() << ": " << it->value().ToString() << endl;
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      delete it;
      
    • 输出一个范围内的key-value,[small, big)
      for (it->Seek(small);
         it->Valid() && it->key().ToString() < big;
         it->Next()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      
    • 反向遍历db中的元素
      for (it->SeekToLast(); it->Valid(); it->Prev()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      
    • 反向遍历一个指定范围的key,如(small, big]
      for (it->SeekForPrev(start);
         it->Valid() && it->key().ToString() > limit;
         it->Prev()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      

    迭代器的接口可以算是 rocksdb针对客户端的核心接口,主要是提供排序以及高效查找的功能。

    测试代码如下:

    #include <iostream>
    #include <string>
    #include <rocksdb/db.h>
    #include <rocksdb/iterator.h>
    #include <rocksdb/table.h>
    #include <rocksdb/options.h>
    #include <rocksdb/env.h>
    
    using namespace std;
    
    
    static string rand_key(unsigned long long key_range) {
        char buff[30];
        unsigned long long n = 1;
    
        for (int i =1; i <= 4; ++i) {
            n *= (unsigned long long ) rand();
        }
    
        sprintf(buff, "%llu", n % key_range);
    
        string k(buff);
        return k;
    }
    
    int main() {
        rocksdb::DB *db;
        rocksdb::Options option;
    
        option.create_if_missing = true;
        option.compression = rocksdb::CompressionType::kNoCompression;
    
        rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
        if (!s.ok()) {
            cout << "Open failed with " << s.ToString() << endl;
            exit(1);
        }
    
        rocksdb::DestroyDB("./iterator_db", option);
    
    	cout << "seek all keys : " << endl;
        for(int i = 0; i < 5; i ++) {
            rocksdb::Status s = db->Put(rocksdb::WriteOptions(), 
                                    rand_key(9), string(10, 'a' + (i % 26)) );
    
            if (!s.ok()) {
                cout << "Put failed with " << s.ToString() << endl;
                exit(1);
            }
        }   
        
        /* traverse rocksdb key-value */
        rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());
        for (it->SeekToFirst(); it->Valid(); it->Next()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        }
    
        string limit="4";
        string start="2";
        cout << "seek from '2' to '4' : " << endl;
        for(it->Seek(start); it->Valid()&&it->key().ToString() < limit;
            it->Next()) {
                cout << it->key().ToString() << ": " << it->value().ToString() << endl;
            } 
        assert(it->status().ok());
    
        cout << "seek from last to start :" << endl;
        for (it->SeekToLast(); it->Valid(); it->Prev()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        }
        assert(it->status().ok());
    
        cout << "seek from '4' to '2' :" << endl;
        for(it->SeekForPrev(limit); it->Valid()&&it->key().ToString() > start;
            it->Prev()) {
                cout << it->key().ToString() << ": " << it->value().ToString() << endl;
            } 
        assert(it->status().ok());
            delete it;
    
        db->Close();
        delete db;
    
        return 0;
    }
    

    输出如下:

    seek all keys : 
    3: cccccccccc
    4: dddddddddd
    7: bbbbbbbbbb
    8: eeeeeeeeee
    seek from '2' to '4' : 
    3: cccccccccc
    seek from last to start :
    8: eeeeeeeeee
    7: bbbbbbbbbb
    4: dddddddddd
    3: cccccccccc
    seek from '4' to '2' :
    4: dddddddddd
    3: cccccccccc
    

    且上层使用rocksdb迭代器接口时一般会和snapshot接口一同使用,用来实现MVCC的版本控制功能。
    关于snapshot的实现,我们在Rocksdb事务:隔离性的实现中有提到,感兴趣的可以看看。

    关于snapshot的客户端接口主要有:

    • sp1 = db->GetSnapshot(); 在当前db状态下创建一个snapshot,添加到内部维护的一个全局的snapshotImpl的双向链表中,并返回该snapshot的对象
    • read_option.snapshot = sp1; 将获取到的snapshot 传给read_option,进行Get操作
    • db->ReleaseSnapshot(sp1); 释放snapshot相关的资源(从双向链表中删除该节点)

    隔离性的测试代码如下:

    #include <iostream>
    #include <string>
    #include <rocksdb/db.h>
    #include <rocksdb/iterator.h>
    #include <rocksdb/table.h>
    #include <rocksdb/options.h>
    #include <rocksdb/env.h>
    
    using namespace std;
    
    int main() {
        rocksdb::DB *db;
        rocksdb::Options option;
    
        option.create_if_missing = true;
        option.compression = rocksdb::CompressionType::kNoCompression;
    
        rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
        if (!s.ok()) {
            cout << "Open failed with " << s.ToString() << endl;
            exit(1);
        }
        // set a snapshot before put
        const rocksdb::Snapshot *sp1 = db->GetSnapshot(); 
    
        s = db->Put(rocksdb::WriteOptions(), "sp2", "value_sp2");
        assert(s.ok());
    
    	// set a snapshot after put
        const rocksdb::Snapshot *sp2 = db->GetSnapshot();
    
        rocksdb::ReadOptions read_option;
        read_option.snapshot = sp1;
        string value = "";
        //预期获取不到sp2的value,因为这里用的是sp1的快照
        s = db->Get(read_option, "sp2", &value); 
        if(value == "") {
            cout << "Can't get sp2 at sp1!" << endl;
        }
    
        read_option.snapshot = sp2;
        // 能够获取到,使用的是sp2的快照,其是在put之后设置的
        s = db->Get(read_option, "sp2", &value); 
        assert(s.ok());
        if(value != "") {
            cout << "Got sp2's value: " << value << endl;
        }
    
        db->ReleaseSnapshot(sp1);
        db->ReleaseSnapshot(sp2);
    

    输出如下:

    Can't get sp2 at sp1!
    Got sp2's value: value_sp2
    

    当然rocksdb也提供了更为复杂的mvcc特性,来以事务的方式支持不同的隔离级别。

    Rocksdb提供迭代器来来访问整个db中的数据,就像STL中的迭代器功能一样,用来访问容器中的具体的数据。

    访问形式以及访问接口有如下几种:

    • 遍历所有的key-value
      //打开db,并初始化一个迭代器指针
      rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
      for (it->SeekToFirst(); it->Valid(); it->Next()) {
          cout << it->key().ToString() << ": " << it->value().ToString() << endl;
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      delete it;
      
    • 输出一个范围内的key-value,[small, big)
      for (it->Seek(small);
         it->Valid() && it->key().ToString() < big;
         it->Next()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      
    • 反向遍历db中的元素
      for (it->SeekToLast(); it->Valid(); it->Prev()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      
    • 反向遍历一个指定范围的key,如(small, big]
      for (it->SeekForPrev(start);
         it->Valid() && it->key().ToString() > limit;
         it->Prev()) {
      ...
      }
      assert(it->status().ok()); // Check for any errors found during the scan
      

    迭代器的接口可以算是 rocksdb针对客户端的核心接口,主要是提供排序以及高效查找的功能。

    测试代码如下:

    #include <iostream>
    #include <string>
    #include <rocksdb/db.h>
    #include <rocksdb/iterator.h>
    #include <rocksdb/table.h>
    #include <rocksdb/options.h>
    #include <rocksdb/env.h>
    
    using namespace std;
    
    
    static string rand_key(unsigned long long key_range) {
        char buff[30];
        unsigned long long n = 1;
    
        for (int i =1; i <= 4; ++i) {
            n *= (unsigned long long ) rand();
        }
    
        sprintf(buff, "%llu", n % key_range);
    
        string k(buff);
        return k;
    }
    
    int main() {
        rocksdb::DB *db;
        rocksdb::Options option;
    
        option.create_if_missing = true;
        option.compression = rocksdb::CompressionType::kNoCompression;
    
        rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
        if (!s.ok()) {
            cout << "Open failed with " << s.ToString() << endl;
            exit(1);
        }
    
        rocksdb::DestroyDB("./iterator_db", option);
    
    	cout << "seek all keys : " << endl;
        for(int i = 0; i < 5; i ++) {
            rocksdb::Status s = db->Put(rocksdb::WriteOptions(), 
                                    rand_key(9), string(10, 'a' + (i % 26)) );
    
            if (!s.ok()) {
                cout << "Put failed with " << s.ToString() << endl;
                exit(1);
            }
        }   
        
        /* traverse rocksdb key-value */
        rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());
        for (it->SeekToFirst(); it->Valid(); it->Next()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        }
    
        string limit="4";
        string start="2";
        cout << "seek from '2' to '4' : " << endl;
        for(it->Seek(start); it->Valid()&&it->key().ToString() < limit;
            it->Next()) {
                cout << it->key().ToString() << ": " << it->value().ToString() << endl;
            } 
        assert(it->status().ok());
    
        cout << "seek from last to start :" << endl;
        for (it->SeekToLast(); it->Valid(); it->Prev()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        }
        assert(it->status().ok());
    
        cout << "seek from '4' to '2' :" << endl;
        for(it->SeekForPrev(limit); it->Valid()&&it->key().ToString() > start;
            it->Prev()) {
                cout << it->key().ToString() << ": " << it->value().ToString() << endl;
            } 
        assert(it->status().ok());
            delete it;
    
        db->Close();
        delete db;
    
        return 0;
    }
    

    输出如下:

    seek all keys : 
    3: cccccccccc
    4: dddddddddd
    7: bbbbbbbbbb
    8: eeeeeeeeee
    seek from '2' to '4' : 
    3: cccccccccc
    seek from last to start :
    8: eeeeeeeeee
    7: bbbbbbbbbb
    4: dddddddddd
    3: cccccccccc
    seek from '4' to '2' :
    4: dddddddddd
    3: cccccccccc
    

    且上层使用rocksdb迭代器接口时一般会和snapshot接口一同使用,用来实现MVCC的版本控制功能。
    关于snapshot的实现,我们在Rocksdb事务:隔离性的实现中有提到,感兴趣的可以看看。

    关于snapshot的客户端接口主要有:

    • sp1 = db->GetSnapshot(); 在当前db状态下创建一个snapshot,添加到内部维护的一个全局的snapshotImpl的双向链表中,并返回该snapshot的对象
    • read_option.snapshot = sp1; 将获取到的snapshot 传给read_option,进行Get操作
    • db->ReleaseSnapshot(sp1); 释放snapshot相关的资源(从双向链表中删除该节点)

    隔离性的测试代码如下:

    #include <iostream>
    #include <string>
    #include <rocksdb/db.h>
    #include <rocksdb/iterator.h>
    #include <rocksdb/table.h>
    #include <rocksdb/options.h>
    #include <rocksdb/env.h>
    
    using namespace std;
    
    int main() {
        rocksdb::DB *db;
        rocksdb::Options option;
    
        option.create_if_missing = true;
        option.compression = rocksdb::CompressionType::kNoCompression;
    
        rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
        if (!s.ok()) {
            cout << "Open failed with " << s.ToString() << endl;
            exit(1);
        }
        // set a snapshot before put
        const rocksdb::Snapshot *sp1 = db->GetSnapshot(); 
    
        s = db->Put(rocksdb::WriteOptions(), "sp2", "value_sp2");
        assert(s.ok());
    
    	// set a snapshot after put
        const rocksdb::Snapshot *sp2 = db->GetSnapshot();
    
        rocksdb::ReadOptions read_option;
        read_option.snapshot = sp1;
        string value = "";
        //预期获取不到sp2的value,因为这里用的是sp1的快照
        s = db->Get(read_option, "sp2", &value); 
        if(value == "") {
            cout << "Can't get sp2 at sp1!" << endl;
        }
    
        read_option.snapshot = sp2;
        // 能够获取到,使用的是sp2的快照,其是在put之后设置的
        s = db->Get(read_option, "sp2", &value); 
        assert(s.ok());
        if(value != "") {
            cout << "Got sp2's value: " << value << endl;
        }
    
        db->ReleaseSnapshot(sp1);
        db->ReleaseSnapshot(sp2);
    

    输出如下:

    Can't get sp2 at sp1!
    Got sp2's value: value_sp2
    

    当然rocksdb也提供了更为复杂的mvcc特性,来以事务的方式支持不同的隔离级别。

  • 相关阅读:
    C# 中的委托和事件
    SQLserver2000与2005同时安装的问题
    又到毕业时
    WCF服务发布和调用IIS服务
    进销存取项目总结
    URL
    undefined reference to `android::Mutex::lock()'
    关于 ffmpeg ‘UINT64_C’ was not declared in this scope 的错误
    Ti 的 OMX_Core
    linux Perforce 使用
  • 原文地址:https://www.cnblogs.com/xueqiuqiu/p/14950074.html
Copyright © 2011-2022 走看看