zoukankan      html  css  js  c++  java
  • HBase源码系列之HFile

    本文讨论0.98版本的hbase里v2版本。其实对于HFile能有一个大体的较深入理解是在我去查看”到底是不是一条记录不能垮block“的时候突然意识到的。

    首先说一个对HFile很直观的感觉,我觉得HFile的整个设计中很重要的一点是为减少内容占用。首先写时候可以把一个个block按顺序写入,满足一个chunk写入一个元数据(包括bloomfilter),最后是一些HFile的元数据。对于HFile,我个人觉得主要把握好几个问题。

    1. block的组织
    2. bf和block的关系
    3. index和block的关系
    4. 写入顺序和一些基本的元数据信息结构
    5. 记录能不能跨block

    明白这四个问题感觉基本可以大致的描绘出HFile了。

    HFileWriterV2

    首先,我们知道会引起下HFile的操作有flush和compaction。在此,我们就选择从flush这个入口跟进去看。

    在StoreFile中,以下方法主要是为了Store书写到一个HFile中。

    1
    long org.apache.hadoop.hbase.regionserver.StoreFlusher.performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint) throws IOException

    在此方法会调用如下方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void (final KeyValue kv) throws IOException {
    appendGeneralBloomfilter(kv);
    appendDeleteFamilyBloomFilter(kv);
    //这行是重点
    writer.append(kv);
    //这行先不管,处理时间戳
    trackTimestamps(kv);
    }

    以下分解append方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //检查key是否有问题,是否按顺序(memstore使用ConcurrentSkipListMap存储,应该不会有此问题)。
    //并且返回key是否重复
    boolean dupKey = checkKey(key, koffset, klength);
    checkValue(value, voffset, vlength);
    //如果不重复,则不检查边界,答案不能,因为如果有重复,不会检查边界更不会新建一个block。***问题5***
    if (!dupKey) {
    //此出会检查block的大小,并且有一处需要注意,在里面的代码中有一些记录block信息的,这个以后会有用。
    //此处会写出chunk,处理readyChunks
    checkBlockBoundary();
    }

    上面注释中说的那个代码如下

    1
    2
    byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
    dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);

    append下面是一些很正常的数据写入(都是对stream的添加操作),元数据记录(firstKeyInBlock)等。

    回到appendGeneralBloomfilter(kv)方法,此方法里面有一个判断是值得注意的。

    1
    2
    3
    4
    5
    //在此代码中会判断key的个数,如果key的个数达到了一定程度就新建一个chunk,放入readyChunks(这个会在checkBlockBoundary中处理),此出会写bf。***问题2***
    enqueueReadyChunk(false);
    ... 这种是处理chunk被写出的时候的操作。重置一些值 ...
    //真正的添加到bf中
    chunk.add(bloomKey, keyOffset, keyLength);

    在enqueueReadyChunk(false)中有

    1
    2
    3
    4
    5
    ReadyChunk readyChunk = new ReadyChunk();
    readyChunk.chunkId = numChunks - 1;
    readyChunk.chunk = chunk;
    readyChunk.firstKey = firstKeyInChunk;
    readyChunks.add(readyChunk);

    然后时间很快就到了close环节。

    1
    2
    //此处组织了block,将加入到此HFile的chunk生成树的结构。
    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);

    block组织也分两类,一个chunk里组织block(他们共生存啊,用了一个bf),另外是root index和intermedia index的组织,实际这个更多感觉是组织chunk。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private void writeInlineBlocks(boolean closing) throws IOException {
    //inlineBlockWriters 应该就3个,两个bf和一个block(待确定)
    for (InlineBlockWriter ibw : inlineBlockWriters) {
    while (ibw.shouldWriteBlock(closing)) {
    long offset = outputStream.getPos();
    boolean cacheThisBlock = ibw.getCacheOnWrite();
    ibw.writeInlineBlock(fsBlockWriter.startWriting(
    ibw.getInlineBlockType()));
    fsBlockWriter.writeHeaderAndData(outputStream);
    //此处添加leaf index block
    ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
    fsBlockWriter.getUncompressedSizeWithoutHeader());
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
    if (cacheThisBlock) {
    doCacheOnWrite(offset);
    }
    }
    }
    }

    ibw.shouldWriteBlock(closing)方法的判断如下,实际是判断是否有chunk

    1
    2
    3
    4
    5
    public boolean shouldWriteBlock(boolean closing) {
    enqueueReadyChunk(closing);
    //readyChunks中保存的是chunk,也就是lead index block
    return !readyChunks.isEmpty();
    }

    下面是写入bloom meta index,感觉就是chunk的那些。

    1
    bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");

    其实还有部分元数据(各种offset和树的生成)没有分析。以后在说吧。

    HFileReaderV2

    由上述的代码分析来看,其实读取的时候最主要要解决的是是否读此block。决定了读此block之后已经没有太多需要在此文章中分析了,因为那是检索流程的事情(组织memstore和storefile)

    1. 读block index和bloom filter信息
    2. 使用这两种索引过滤block

    HFileReader主要涉及到的几个方法,包括获取和open。发生在在检索获取scanner和过滤scanner时。

    在List HStore.getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)中如下代码,获取此store中的file对应的scanner。

    1
    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);

    此方法调用了如下方法。

    1
    2
    //此方法会调用Open方法
    StoreFile.Reader r = file.createReader(canUseDrop);

    接着调用open方法,方法如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    if (this.reader != null) {
    throw new IllegalAccessError("Already open");
    }
    // Open the StoreFile.Reader
    this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
    // Load up indices and fileinfo. This also loads Bloom filter type.
    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
    // Read in our metadata.
    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
    if (b != null) {
    // By convention, if halfhfile, top half has a sequence number > bottom
    // half. Thats why we add one in below. Its done for case the two halves
    // are ever merged back together --rare. Without it, on open of store,
    // since store files are distinguished by sequence id, the one half would
    // subsume the other.
    this.sequenceid = Bytes.toLong(b);
    if (fileInfo.isTopReference()) {
    this.sequenceid += 1;
    }
    }
    if (isBulkLoadResult()){
    // generate the sequenceId from the fileName
    // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
    String fileName = this.getPath().getName();
    // Use lastIndexOf() to get the last, most recent bulk load seqId.
    int startPos = fileName.lastIndexOf("SeqId_");
    if (startPos != -1) {
    this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
    fileName.indexOf('_', startPos + 6)));
    // Handle reference files as done above.
    if (fileInfo.isTopReference()) {
    this.sequenceid += 1;
    }
    }
    this.reader.setBulkLoaded(true);
    }
    this.reader.setSequenceID(this.sequenceid);
    b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
    if (b != null) {
    this.maxMemstoreTS = Bytes.toLong(b);
    }
    b = metadataMap.get(MAJOR_COMPACTION_KEY);
    if (b != null) {
    boolean mc = Bytes.toBoolean(b);
    if (this.majorCompaction == null) {
    this.majorCompaction = new AtomicBoolean(mc);
    } else {
    this.majorCompaction.set(mc);
    }
    } else {
    // Presume it is not major compacted if it doesn't explicity say so
    // HFileOutputFormat explicitly sets the major compacted key.
    this.majorCompaction = new AtomicBoolean(false);
    }
    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
    //此出会读取bloom filter
    BloomType hfileBloomType = reader.getBloomFilterType();
    if (cfBloomType != BloomType.NONE) {
    reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
    if (hfileBloomType != cfBloomType) {
    LOG.info("HFile Bloom filter type for "
    + reader.getHFileReader().getName() + ": " + hfileBloomType
    + ", but " + cfBloomType + " specified in column family "
    + "configuration");
    }
    } else if (hfileBloomType != BloomType.NONE) {
    LOG.info("Bloom filter turned off by CF config for "
    + reader.getHFileReader().getName());
    }
    // load delete family bloom filter
    reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
    try {
    this.reader.timeRange = TimeRangeTracker.getTimeRange(metadataMap.get(TIMERANGE_KEY));
    } catch (IllegalArgumentException e) {
    LOG.error("Error reading timestamp range data from meta -- " +
    "proceeding without", e);
    this.reader.timeRange = null;
    }
    return this.reader;

    判断的一个文件是否需要读取时,在伟大的 boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) 方法中的如下方法使用了bloomfilter。

    1
    2
    //此处使用bloomfilter过滤。在此方法中会调用bloomFilter.contains,在此contains会先使用block index 判断。
    reader.passesBloomFilter(scan, columns)

    里面会调用一个contains

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    //判断读取哪个block,rootBlockContaingKey里的blockKeys为chunk的个数。
    //index是从bloommeta中读取,DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); 代码获取。
    int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
    if (block < 0) {
    result = false; // This key is not in the file.
    } else {
    HFileBlock bloomBlock;
    try {
    // We cache the block and use a positional read.
    //读取那个chunk的bf
    bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
    index.getRootBlockDataSize(block), true, true, false, true,
    BlockType.BLOOM_CHUNK);
    } catch (IOException ex) {
    // The Bloom filter is broken, turn it off.
    throw new IllegalArgumentException(
    "Failed to load Bloom block for key "
    + Bytes.toStringBinary(key, keyOffset, keyLength), ex);
    }
    ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
    result = ByteBloomFilter.contains(key, keyOffset, keyLength,
    bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(),
    bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
    }

    在如下方法(感觉时seekTO时,用于scan时指定了开始的rowkey,这样解释就合理了。在reader.passesBloomFilter中有判断是否时scan)中使用block index过滤了。

    1
    BlockWithScanInfo org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader.loadDataBlockWithScanInfo(byte[] key, int keyOffset, int keyLength, HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction) throws IOException

    CompoundBloomFilter构造方法中读取Block index的数据。

  • 相关阅读:
    centos 安装netstat
    du 常见的命令
    CentOS7 安装Python3.6.8
    Alpine安装telnet
    TypeError: 'NoneType' object is not callable
    docker中删除dead状态的容器
    监控进程,线程shell脚本
    pyinstaller打包py成exe后音乐文件播放异常pygame.error failed to execute script
    lambda expressions
    Domain logic approaches
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12247822.html
Copyright © 2011-2022 走看看