本文讨论0.98版本的hbase里v2版本。其实对于HFile能有一个大体的较深入理解是在我去查看”到底是不是一条记录不能垮block“的时候突然意识到的。
首先说一个对HFile很直观的感觉,我觉得HFile的整个设计中很重要的一点是为减少内容占用。首先写时候可以把一个个block按顺序写入,满足一个chunk写入一个元数据(包括bloomfilter),最后是一些HFile的元数据。对于HFile,我个人觉得主要把握好几个问题。
- block的组织
- bf和block的关系
- index和block的关系
- 写入顺序和一些基本的元数据信息结构
- 记录能不能跨block
明白这四个问题感觉基本可以大致的描绘出HFile了。
HFileWriterV2
首先,我们知道会引起下HFile的操作有flush和compaction。在此,我们就选择从flush这个入口跟进去看。
在StoreFile中,以下方法主要是为了Store书写到一个HFile中。
1
| long org.apache.hadoop.hbase.regionserver.StoreFlusher.performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint) throws IOException
|
在此方法会调用如下方法
1 2 3 4 5 6 7 8 9
| public void (final KeyValue kv) throws IOException { appendGeneralBloomfilter(kv); appendDeleteFamilyBloomFilter(kv); writer.append(kv); trackTimestamps(kv); }
|
以下分解append方法
1 2 3 4 5 6 7 8 9 10
| boolean dupKey = checkKey(key, koffset, klength); checkValue(value, voffset, vlength); if (!dupKey) { checkBlockBoundary(); }
|
上面注释中说的那个代码如下
1 2
| byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock); dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
|
append下面是一些很正常的数据写入(都是对stream的添加操作),元数据记录(firstKeyInBlock)等。
回到appendGeneralBloomfilter(kv)方法,此方法里面有一个判断是值得注意的。
1 2 3 4 5
| enqueueReadyChunk(false); ... 这种是处理chunk被写出的时候的操作。重置一些值 ... chunk.add(bloomKey, keyOffset, keyLength);
|
在enqueueReadyChunk(false)中有
1 2 3 4 5
| ReadyChunk readyChunk = new ReadyChunk(); readyChunk.chunkId = numChunks - 1; readyChunk.chunk = chunk; readyChunk.firstKey = firstKeyInChunk; readyChunks.add(readyChunk);
|
然后时间很快就到了close环节。
1 2
| long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
|
block组织也分两类,一个chunk里组织block(他们共生存啊,用了一个bf),另外是root index和intermedia index的组织,实际这个更多感觉是组织chunk。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private void writeInlineBlocks(boolean closing) throws IOException { for (InlineBlockWriter ibw : inlineBlockWriters) { while (ibw.shouldWriteBlock(closing)) { long offset = outputStream.getPos(); boolean cacheThisBlock = ibw.getCacheOnWrite(); ibw.writeInlineBlock(fsBlockWriter.startWriting( ibw.getInlineBlockType())); fsBlockWriter.writeHeaderAndData(outputStream); ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), fsBlockWriter.getUncompressedSizeWithoutHeader()); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); if (cacheThisBlock) { doCacheOnWrite(offset); } } } }
|
ibw.shouldWriteBlock(closing)方法的判断如下,实际是判断是否有chunk
1 2 3 4 5
| public boolean shouldWriteBlock(boolean closing) { enqueueReadyChunk(closing); return !readyChunks.isEmpty(); }
|
下面是写入bloom meta index,感觉就是chunk的那些。
1
| bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
|
其实还有部分元数据(各种offset和树的生成)没有分析。以后在说吧。
HFileReaderV2
由上述的代码分析来看,其实读取的时候最主要要解决的是是否读此block。决定了读此block之后已经没有太多需要在此文章中分析了,因为那是检索流程的事情(组织memstore和storefile)
- 读block index和bloom filter信息
- 使用这两种索引过滤block
HFileReader主要涉及到的几个方法,包括获取和open。发生在在检索获取scanner和过滤scanner时。
在List HStore.getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)中如下代码,获取此store中的file对应的scanner。
1
| List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
|
此方法调用了如下方法。
1 2
| StoreFile.Reader r = file.createReader(canUseDrop);
|
接着调用open方法,方法如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| if (this.reader != null) { throw new IllegalAccessError("Already open"); } this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind); metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); if (b != null) { this.sequenceid = Bytes.toLong(b); if (fileInfo.isTopReference()) { this.sequenceid += 1; } } if (isBulkLoadResult()){ String fileName = this.getPath().getName(); int startPos = fileName.lastIndexOf("SeqId_"); if (startPos != -1) { this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6))); if (fileInfo.isTopReference()) { this.sequenceid += 1; } } this.reader.setBulkLoaded(true); } this.reader.setSequenceID(this.sequenceid); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); if (b != null) { this.maxMemstoreTS = Bytes.toLong(b); } b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); if (this.majorCompaction == null) { this.majorCompaction = new AtomicBoolean(mc); } else { this.majorCompaction.set(mc); } } else { this.majorCompaction = new AtomicBoolean(false); } b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); BloomType hfileBloomType = reader.getBloomFilterType(); if (cfBloomType != BloomType.NONE) { reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); if (hfileBloomType != cfBloomType) { LOG.info("HFile Bloom filter type for " + reader.getHFileReader().getName() + ": " + hfileBloomType + ", but " + cfBloomType + " specified in column family " + "configuration"); } } else if (hfileBloomType != BloomType.NONE) { LOG.info("Bloom filter turned off by CF config for " + reader.getHFileReader().getName()); } reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META); try { this.reader.timeRange = TimeRangeTracker.getTimeRange(metadataMap.get(TIMERANGE_KEY)); } catch (IllegalArgumentException e) { LOG.error("Error reading timestamp range data from meta -- " + "proceeding without", e); this.reader.timeRange = null; } return this.reader;
|
判断的一个文件是否需要读取时,在伟大的 boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) 方法中的如下方法使用了bloomfilter。
1 2
| reader.passesBloomFilter(scan, columns)
|
里面会调用一个contains
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| int block = index.rootBlockContainingKey(key, keyOffset, keyLength); if (block < 0) { result = false; } else { HFileBlock bloomBlock; try { bloomBlock = reader.readBlock(index.getRootBlockOffset(block), index.getRootBlockDataSize(block), true, true, false, true, BlockType.BLOOM_CHUNK); } catch (IOException ex) { throw new IllegalArgumentException( "Failed to load Bloom block for key " + Bytes.toStringBinary(key, keyOffset, keyLength), ex); } ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly(); result = ByteBloomFilter.contains(key, keyOffset, keyLength, bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(), bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); }
|
在如下方法(感觉时seekTO时,用于scan时指定了开始的rowkey,这样解释就合理了。在reader.passesBloomFilter中有判断是否时scan)中使用block index过滤了。
1
| BlockWithScanInfo org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader.loadDataBlockWithScanInfo(byte[] key, int keyOffset, int keyLength, HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction) throws IOException
|
CompoundBloomFilter构造方法中读取Block index的数据。