zoukankan      html  css  js  c++  java
  • Major compaction时的scan操作

    版权声明:本文为博主原创文章。未经博主同意不得转载。

    https://blog.csdn.net/u014393917/article/details/24419355

    Major compaction时的scan操作

    发起majorcompaction时,通过CompactSplitThread.CompactionRunner.run開始运行

    -->region.compact(compaction,store)-->store.compact(compaction)-->

    CompactionContext.compact,发起compact操作

    CompactionContext的实例通过HStore中的storeEngine.createCompaction()生成,

    默认值为DefaultStoreEngine,通过hbase.hstore.engine.class配置。

    默认的CompactionContext实例为DefaultCompactionContext

    DefaultCompactionContext.compact方法终于调用DefaultStoreEngine.compactor来运行

    compactor的实现通过hbase.hstore.defaultengine.compactor.class配置,默认实现为DefaultCompactor

    调用DefaultCompactor.compact(request);


    1.依据要进行compactstorefile文件,生成相应的StoreFileScanner集合列表。

    在生成StoreFileScanner实例时,每个scanner中的ScanQueryMatchernull


    2.创建StoreScanner实例。设置ScanTypeScanType.COMPACT_DROP_DELETES


    privateStoreScanner(Storestore,ScanInfoscanInfo,Scanscan,

    List<?

    extendsKeyValueScanner>scanners,ScanTypescanType,longsmallestReadPoint,

    longearliestPutTs,byte[]dropDeletesFromRow,byte[]dropDeletesToRow)throwsIOException {

    this(store,false,scan,null,scanInfo.getTtl(),

    scanInfo.getMinVersions());

    if(dropDeletesFromRow==null){

    运行这里,传入的columnsnull

    matcher=newScanQueryMatcher(scan,scanInfo,null,scanType,

    smallestReadPoint,earliestPutTs,oldestUnexpiredTS);

    }else{

    matcher=newScanQueryMatcher(scan,scanInfo,null,smallestReadPoint,

    earliestPutTs,oldestUnexpiredTS,dropDeletesFromRow,dropDeletesToRow);

    }


    ScanqueryMatcher的构造方法:

    传入的columnsnull

    publicScanQueryMatcher(Scanscan,ScanInfoscanInfo,

    NavigableSet<byte[]>columns,ScanTypescanType,

    longreadPointToUse,longearliestPutTs,longoldestUnexpiredTS){

    trmintime=0,maxtime=long.maxvalue

    this.tr=scan.getTimeRange();

    this.rowComparator=scanInfo.getComparator();

    deletes属性中的kvdelete信息为到一个新的row时。都会又一次进行清空。

    this.deletes=newScanDeleteTracker();

    this.stopRow=scan.getStopRow();

    this.startKey= KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),

    scanInfo.getFamily());

    得到filter实例

    this.filter=scan.getFilter();

    this.earliestPutTs=earliestPutTs;

    this.maxReadPointToTrackVersions=readPointToUse;

    this.timeToPurgeDeletes=scanInfo.getTimeToPurgeDeletes();

    此处为的值为false

    /*how to deal with deletes */

    this.isUserScan=scanType== ScanType.USER_SCAN;

    此处的值为false,scanInfo.getKeepDeletedCells()的值默认false,

    可通过tablecolumnfmaily中配置KEEP_DELETED_CELLS属性

    scan.isRaw()可通过在scansetAttribute_raw_属性,默觉得false

    //keep deleted cells: if compaction or raw scan

    this.keepDeletedCells= (scanInfo.getKeepDeletedCells()&& !isUserScan)||scan.isRaw();

    此处的值为false,此时是majorcompact,不保留delete的数据

    scan.isRaw()可通过在scansetAttribute_raw_属性,默觉得false

    //retain deletes: if minor compaction or raw scan

    this.retainDeletesInOutput=scanType== ScanType.COMPACT_RETAIN_DELETES||scan.isRaw();

    此时的值为false

    //seePastDeleteMarker: user initiated scans

    this.seePastDeleteMarkers=scanInfo.getKeepDeletedCells()&&isUserScan;

    得到查询的最大版本号数,此时的scan.maxversionscanInfo.maxversion的值是同样的值

    intmaxVersions=

    scan.isRaw()?scan.getMaxVersions(): Math.min(scan.getMaxVersions(),

    scanInfo.getMaxVersions());


    生成columns属性的值为ScanWildcardColumnTracker实例,设置hasNullColumn的值为true

    //Single branch to deal with two types of reads (columnsvsall in family)

    if(columns ==null||columns.size()== 0) {

    //there is always a null column in thewildcardcolumn query.

    hasNullColumn=true;

    columns属性中的index表示当前比对到的column的下标值。每比較一行时。此值会又一次清空

    //use a specialized scan forwildcardcolumn tracker.

    this.columns=newScanWildcardColumnTracker(

    scanInfo.getMinVersions(),maxVersions,oldestUnexpiredTS);

    }else{

    这个部分在compact时是不会运行的

    //whether there is null column in the explicit column query

    hasNullColumn= (columns.first().length== 0);


    //We can share the ExplicitColumnTracker,diffis we reset

    //between rows, not betweenstorefiles.

    this.columns=newExplicitColumnTracker(columns,

    scanInfo.getMinVersions(),maxVersions,oldestUnexpiredTS);

    }

    }


    ScanQueryMatcher.match过滤kv是否包括的方法分析

    在通过StoreScanner.next(kvlist,limit)得到下一行的kv集合时

    调用ScanQueryMatcher.match来过滤数据的方法分析

    当中match方法返回的值详细作用可參见StoreScanner中的例如以下方法:

    publicbooleannext(List<Cell>outResult,intlimit).....


    publicMatchCodematch(KeyValuekv)throwsIOException {

    调用filterfilterAllRemaining方法,假设此方法返回true表示此次scan结束

    if(filter !=null&&filter.filterAllRemaining()){

    returnMatchCode.DONE_SCAN;

    }

    得到kv的值

    byte[]bytes =kv.getBuffer();

    KVbytes中的開始位置

    intoffset =kv.getOffset();

    得到key的长度

    keyvalue的组成:

    4

    4

    2

    ~

    1

    ~

    ~

    8

    1

    ~

    kenlen

    vlen

    rowlen

    row

    cflen

    cf

    column

    timestamp

    kvtype

    value


    intkeyLength =Bytes.toInt(bytes,offset,Bytes.SIZEOF_INT);

    得到rowkey的长度记录的開始位置(不包括keylenvlen

    offset+= KeyValue.ROW_OFFSET;

    rowkey的长度记录的開始位置

    intinitialOffset=offset;

    得到rowkey的长度

    shortrowLength =Bytes.toShort(bytes,offset,Bytes.SIZEOF_SHORT);

    得到rowkey的開始位置

    offset+= Bytes.SIZEOF_SHORT;

    比較当前传入的kvrowkey部分是否与当前行中第一个kvrowkey部分同样。也就是是否是同一行的数据

    intret =this.rowComparator.compareRows(row,this.rowOffset,this.rowLength,

    bytes,offset,rowLength);

    假设当前传入的kv中的rowkey大于当前行的kvrowkey部分,表示如今传入的kv是下一行,

    结束当前next操作,(不是结束scan,是结束当次的next。表示这个next的一行数据的全部kv都查找完了)

    if(ret <=-1) {

    returnMatchCode.DONE;

    否则表示当前传入的kv是上一行的数据,须要把当前的scanner向下移动一行

    }elseif(ret >=1) {

    //could optimize this, if necessary?

    //Could also be called SEEK_TO_CURRENT_ROW, but this

    //should be rare/never happens.

    returnMatchCode.SEEK_NEXT_ROW;

    }

    优化配置,是否须要不运行以下流程,直接把当前的scanner向下移动一行

    stickyNextRow的值为true的条件:

    1.ColumnTracker.done返回为true,

    2.ColumnTracker.checkColumn返回为SEEK_NEXT_ROW.

    3.filter.filterKeyValue(kv);返回结果为NEXT_ROW

    4.ColumnTracker.checkVersions返回为INCLUDE_AND_SEEK_NEXT_ROW

    ColumnTracker的实如今scancolumnsnull或者是compactscan时为ScanWildcardColumnTracker

    否则为ExplicitColumnTracker


    //optimize case.

    if(this.stickyNextRow)

    returnMatchCode.SEEK_NEXT_ROW;

    ScanWildcardColumnTracker实例中返回值为false,

    ExplicitColumnTracker实例中返回值是当前的kv是否大于或等于查找的column列表的总和

    if(this.columns.done()){

    stickyNextRow=true;

    returnMatchCode.SEEK_NEXT_ROW;

    }

    得到familylen的记录位置

    //PassingrowLength

    offset+=rowLength;

    得到family的长度

    //Skippingfamily

    bytefamilyLength=bytes[offset];

    把位置移动到family的名称记录的位置

    offset+=familyLength+ 1;

    得到column的长度

    intqualLength=keyLength-

    (offset-initialOffset)- KeyValue.TIMESTAMP_TYPE_SIZE;

    得到kvtimestamp的值

    longtimestamp =Bytes.toLong(bytes,initialOffset+keyLength– KeyValue.TIMESTAMP_TYPE_SIZE);

    检查timestamp是否在指定的范围内,主要检查ttl是否过期

    //check for early out based ontimestampalone

    if(columns.isDone(timestamp)){

    假设发现kvttl过期,在ScanWildcardColumnTracker实例中直接返回SEEK_NEXT_COL。这个在compact中是默认

    ExplicitColumnTracker实例中检查是否有下一个column假设有返回SEEK_NEXT_COL。否则返回SEEK_NEXT_ROW

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }


    /*

    *The delete logic is pretty complicated now.

    *This is corroborated by the following:

    *1. The store might be instructed to keep deleted rows around.

    *2. A scan can optionally see past a delete marker now.

    *3. If deleted rows are kept, we have to find out when we can

    * remove the delete markers.

    *4. Family delete markers are always first (regardless of their TS)

    *5. Delete markers should not be counted as version

    *6. Delete markers affect puts of the *same* TS

    *7. Delete marker need to be version counted together with puts

    * they affect

    */

    得到kv的类型。

    bytetype =bytes[initialOffset+keyLength– 1];

    假设kv是删除的kv

    if(kv.isDelete()){

    在默认情况下,此keepDeletedCells值为false,这里的if检查会进去

    if(!keepDeletedCells){

    //first ignore delete markers if the scanner can do so, and the

    //range does not include the marker

    //

    //during flushes andcompactionsalso ignore delete markers newer

    //than thereadpointof any open scanner, this prevents deleted

    //rows that could still be seen by a scanner from being collected

    此时的值为true,scan中的tr默觉得alltime=true

    booleanincludeDeleteMarker=seePastDeleteMarkers?

    tr.withinTimeRange(timestamp):

    tr.withinOrAfterTimeRange(timestamp);

    把删除的kv加入到DeleteTracker中。compact时的实现为ScanDeleteTracker

    if(includeDeleteMarker

    &&kv.getMvccVersion()<=maxReadPointToTrackVersions){

    this.deletes.add(bytes,offset,qualLength,timestamp,type);

    }

    //Can't early out now, because DelFam come before any other keys

    }

    假设非minorcompact时。

    或者在compactscan时,同一时候当前时间减去kvtimestamp还不到hbase.hstore.time.to.purge.deletes配置的时间。

    默认的配置值为0,

    或者kvmvcc值大于如今最大的mvcc值时。此if会进行。

    眼下在做majorcompactscan,不进去

    if(retainDeletesInOutput

    || (!isUserScan&& (EnvironmentEdgeManager.currentTimeMillis()-timestamp)<=timeToPurgeDeletes)

    ||kv.getMvccVersion()>maxReadPointToTrackVersions){

    //always include or it is not time yet to check whether it is OK

    //to purgedeltesor not

    if(!isUserScan){

    //if this is not a user scan (compaction), we can filter thisdeletemarkerright here

    //otherwise (i.e. a "raw" scan) we fall through to normalversion andtimerangechecking

    returnMatchCode.INCLUDE;

    }

    下面的检查通常情况不会进入

    }elseif(keepDeletedCells){

    if(timestamp<earliestPutTs){

    //keeping delete rows, but there are no puts older than

    //this delete in the store files.

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }

    //else: fall through and do version counting on the

    //delete markers

    假设kv是已经deletekv,加入到DeleteTracker后,直接返回SKIP.

    }else{

    returnMatchCode.SKIP;

    }

    //note the following next else if...

    //delete marker are not subject to other delete markers

    }elseif(!this.deletes.isEmpty()){

    假设不是删除的KV时。检查删除的kv中是否包括此kv的版本号。

    a.假设KVDeleteFamily。同一时候当前的KVTIMESTAMP的值小于删除的KVTIMESTAMP的值,返回FAMILY_DELETED

    b.假设KVDeleteFamilyVersion已经删除掉的版本号(删除时指定了timestamp)。返回FAMILY_VERSION_DELETED

    c.假设KV的是DeleteColumn,同一时候deleteTracker中包括的kv中部分qualifier的值

    与传入的kv中部分qualifier的值同样。同一时候delete中包括的kvDeleteColumn返回COLUMN_DELETED

    否则deleteTracker中包括的kv中部分qualifier的值与传入的kv中部分qualifier的值同样。

    同一时候传入的kv中的timestamp的值是delete中的timestamp值。表示删除指定的版本号,返回VERSION_DELETED

    d.否则表示没有删除的情况,返回NOT_DELETED

    DeleteResultdeleteResult=deletes.isDeleted(bytes,offset,qualLength,

    timestamp);

    switch(deleteResult){

    caseFAMILY_DELETED:

    caseCOLUMN_DELETED:

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    caseVERSION_DELETED:

    caseFAMILY_VERSION_DELETED:

    returnMatchCode.SKIP;

    caseNOT_DELETED:

    break;

    default:

    thrownewRuntimeException("UNEXPECTED");

    }

    }

    检查当前传入的kvtimestamp是否在包括的时间范围内,默认的scan是全部时间都包括

    inttimestampComparison=tr.compare(timestamp);

    假设当前kv的时间超过了最大的时间,返回SKIP

    if(timestampComparison>= 1) {

    returnMatchCode.SKIP;

    }elseif(timestampComparison<= -1) {

    假设当前kv的时间小于了最小的时间,返回SEEK_NEXT_COL或者SEEK_NEXT_ROW

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }

    假设时间在正常的范围内,columns.checkColumn假设是compact时的scan此方法返回INCLUDE

    其他情况请參见ExplicitColumnTracker的实现。

    //STEP 1: Check if the column is part of the requested columns

    MatchCodecolChecker=columns.checkColumn(bytes,offset,qualLength,type);

    此处的IF检查会进入

    if(colChecker==MatchCode.INCLUDE){

    运行filter操作,并依据filter的响应返回相关的值。此处不在说明,比較easy看明确。

    ReturnCodefilterResponse=ReturnCode.SKIP;

    //STEP 2: Yes, the column is part of the requested columns. Check iffilter is present

    if(filter !=null){

    //STEP 3: Filter the key value and return if it filters out

    filterResponse=filter.filterKeyValue(kv);

    switch(filterResponse){

    caseSKIP:

    returnMatchCode.SKIP;

    caseNEXT_COL:

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    caseNEXT_ROW:

    stickyNextRow=true;

    returnMatchCode.SEEK_NEXT_ROW;

    caseSEEK_NEXT_USING_HINT:

    returnMatchCode.SEEK_NEXT_USING_HINT;

    default:

    //Itmeans it is either include or include and seek next

    break;

    }

    }

    /*

    * STEP 4: Reaching this stepmeans the column is part of the requested columns and either

    * the filter is null or thefilter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.

    * Now check the number ofversions needed. This method call returns SKIP, INCLUDE,

    * INCLUDE_AND_SEEK_NEXT_ROW,INCLUDE_AND_SEEK_NEXT_COL.

    *

    * FilterResponse ColumnChecker Desired behavior

    * INCLUDE SKIP row has already been included, SKIP.

    * INCLUDE INCLUDE INCLUDE

    * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

    * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

    *

    * In all the above scenarios, wereturn the column checker return value except for

    * FilterResponse(INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)

    */

    colChecker=

    columns.checkVersions(bytes,offset,qualLength,timestamp,type,

    kv.getMvccVersion()>maxReadPointToTrackVersions);

    //Optimizewith stickyNextRow

    stickyNextRow=colChecker==MatchCode.INCLUDE_AND_SEEK_NEXT_ROW?true:stickyNextRow;

    return(filterResponse==ReturnCode.INCLUDE_AND_NEXT_COL&&

    colChecker==MatchCode.INCLUDE)?MatchCode.INCLUDE_AND_SEEK_NEXT_COL

    :colChecker;

    }

    stickyNextRow= (colChecker==MatchCode.SEEK_NEXT_ROW)?true

    :stickyNextRow;

    returncolChecker;

    }


    majorminorcompact写入新storefile时的差别

    假设是majorcompact的写入。会在closewriter时,

    meta中写入major==true的值MAJOR_COMPACTION_KEY=true

    此值主要用来控制做minorcompact时是否选择这个storefile文件。


    if (writer!=null) {

    writer.appendMetadata(fd.maxSeqId,request.isMajor());

    writer.close();

    newFiles.add(writer.getPath());

    }




  • 相关阅读:
    BigPipe
    HDFS Scribe Integration 【转】
    C++ | class size
    Leetcode | Container With Most Water
    Leetcode | Sqrt(x)
    Network | sk_buff
    JVM, JRE 和JDK
    facebook面试题【转】
    ML | SVM
    ML| EM
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10794085.html
Copyright © 2011-2022 走看看