zoukankan      html  css  js  c++  java
  • 股票数据存储系统(KeyValue存储)设计与实现

      HBase中的HFile采用了与本文类似的设计,数据分块存储,块的索引存放在文件末尾。

      Kafka中每个topic划分为多个partition存储,partition又划分为多个segment;每个segment由索引文件index和数据文件log组成,index中每个索引块的大小为8Bytes,每个索引块包含offset和position,position指明某条message在log文件中的偏移量。

      Redis的内部映射数据结构ziplist其实做了相同的事情,有两个差别:1、ziplist将数据写入内存,这里是写入文件;2、ziplist没有索引区,每个数据块记录自己的大小以及前一个数据块的大小。

      ----------------------------------------------------------------------------------------------------------------

      该系统用来存储每日股票交易数据,数据按日期分块线性存储,日期看做是每块数据的Key,总体结构如下。文件头中记录版本号等信息,最重要的一个信息是已存数据块数,每次增加、删除数据后需更新。

      索引区存储数据块的索引信息,每个索引有如下三个属性:数据大小、日期和起始位置。当打开文件加载到内存时,索引区会被首先加载;查找某日数据时,根据日期对内存中的索引做一次二分查找,得到数据块在文件中的起始位置和数据大小(终止位置),通过一次硬盘查找就能读取该块数据。

      0.总体设计

     1 class CXFSXFile {
     2 
     3 public:
     4     CXFSXFile();
     5     virtual ~CXFSXFile();
     6      
     7     bool openFile(const string& path);
     8     
     9     bool get(vector<char>& contents, const int date);
    10     
    11     bool remove(const int date);
    12     
    13     bool insert(const char* s, const int size, const int date);
    14 
    15     int howManyData();    
    16 
    17 private:   
    18     void createFile();
    19     void loadFile();
    20 
    21     bool ifDataExist(const int date);
    22     void updateFileHeader();
    23     void updateIndexFeild();    
    24 
    25     bool append(const char* s, const int size, const int date);
    26 
    27     ... ...

    31 int m_numberOfBlocksUsed; 32 33 static const int M_SizeOfIndexItem; 34 static const int M_StartPosForIndex; 35 static const int M_StartPosForData; 36 37 struct SIndexItem; 38 39 string m_filePath; 40 fstream m_xfsxFile; 41 42 vector<SIndexItem> m_arrayOfIndex; 43 44 friend bool compareIndexItem(const SIndexItem& a, const SIndexItem& b); 45 friend class CModifyPos; 46 47 };

      1.功能代码

      索引块实现:

     1 struct CXFSXFile::SIndexItem{
     2     SIndexItem(const int sizeOfBlock, const int date, const int startPos):_sizeOfBlock(sizeOfBlock), _date(date), _startPos(startPos)
     3     {}
     4 
     5     SIndexItem() {
     6         memset(this, 0, sizeof(SIndexItem)); }
     7 
     8     int _sizeOfBlock;
     9     int _date;
    10     int _startPos;
    11 };

      索引块比较(按时间) &  索引更新(内存中):

     1 bool compareIndexItem(const CXFSXFile::SIndexItem& a, const CXFSXFile::SIndexItem& b) {
     2     return (a._date < b._date);
     3 }
     4 
     5 class CModifyPos {
     6 public:
     7     CModifyPos(int n):posOffset(n) {}
     8 
     9     void operator() (CXFSXFile::SIndexItem& item) {
    10     item._startPos += posOffset;
    11     }
    12 
    13 private:
    14     int posOffset;
    15 };

      2.初始化代码 & 更新代码

      初始化代码主要负责提取文件头及索引块中的有用信息;更新代码在每次添加、删除数据后更新文件头及索引。

      更新索引(文件中):

     1 void CXFSXFile::updateIndexFeild() {
     2 
     3     for(int i=0; i<m_arrayOfIndex.size(); i++)
     4     {
     5         int theSize = m_arrayOfIndex[i]._sizeOfBlock;
     6         int theDate = m_arrayOfIndex[i]._date;
     7         int thePos = m_arrayOfIndex[i]._startPos;
     8 
     9         m_xfsxFile.seekp(M_StartPosForIndex + M_SizeOfIndexItem * i);
    10         m_xfsxFile.write((char*)(&theSize), sizeof(int));
    11 
    12         m_xfsxFile.seekp(M_StartPosForIndex + M_SizeOfIndexItem * i + 4);
    13         m_xfsxFile.write((char*)(&theDate), sizeof(int));
    14 
    15         m_xfsxFile.seekp(M_StartPosForIndex + M_SizeOfIndexItem * i + 8);
    16         m_xfsxFile.write((char*)(&thePos), sizeof(int));
    17     }
    18 
    19     m_xfsxFile.flush();
    20 }

      3.主要接口实现

      包含了数据的读取、添加和删除;数据添加的逻辑稍复杂,其中包含了添加到末尾(append)、覆盖、中间插入等不同情形。

      判断一数据块是否存在(以时间为Key做二分查找):

    1 bool CXFSXFile::ifDataExist(const int date) {
    2 
    3     if(m_numberOfBlocksUsed == 0)
    4         return false;
    5 
    6     SIndexItem tep(0, date, 0);
    7 
    8     return binary_search(m_arrayOfIndex.begin(), m_arrayOfIndex.end(), tep, compareIndexItem);
    9 }

      读取数据:

     1 bool CXFSXFile::get(vector<char>& contents, const int date) {
     2     contents.clear();
     3     if(!ifDataExist(date))
     4         return false;
     5 
     6     SIndexItem tep(0, date, 0);
     7     vector<SIndexItem>::iterator itr;
     8     itr = lower_bound(m_arrayOfIndex.begin(), m_arrayOfIndex.end(), tep, compareIndexItem);
     9 
    10     int pos = (*itr)._startPos;
    11     int size = (*itr)._sizeOfBlock;
    12 
    13     contents.resize(size, '0');
    14     m_xfsxFile.seekg(pos);
    15     m_xfsxFile.read((char*)&contents[0], size);
    16 
    17     return true;
    18 }
  • 相关阅读:
    MySQL查询出错提示 --secure-file-priv解决方法
    导入csv文件到mysql
    Ubuntu16.04配置phpmyadmin
    TensorFlow安装(Ubuntu 16.04)
    github设置只识别指定类型的文件
    python file operations
    Ubuntu16.04 install flash plugin
    PRML读书笔记——3 Linear Models for Regression
    Ubuntu下查看机器信息
    PRML读书笔记——2 Probability Distributions
  • 原文地址:https://www.cnblogs.com/crazychris/p/2658397.html
Copyright © 2011-2022 走看看