zoukankan      html  css  js  c++  java
  • compact处理流程分析

    compact处理流程分析

    compact的处理与split同样。由client端与flush时检查发起。

    针对compact另一个在rs生成时生成的CompactionChecker线程定期去检查是否须要做compact操作

    线程运行的间隔时间通过hbase.server.thread.wakefrequency配置,默觉得10*1000ms

    CompactionChecker线程主要作用:

    生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否须要compact的检查线程,

    假设须要进行compact,会在此处通过compact的线程触发compcat的请求

    此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

    假设majorcompact的优先级大过此值,compact的优先级设置为此值.

    Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间须要进行compact检查的间隔

    默觉得1000ms,

    compactionChecker的检查周期为wakefrequency*multiplierms,

    也就是默认情况下线程调用1000次运行一次compact检查

    a.compaction检查时发起compact的条件是

    假设一个store中全部的file个数减去在做(或发起compact请求)的个数,大于或等于

    hbase.hstore.compaction.min配置的值,

    老版本号使用hbase.hstore.compactionThreshold进行配置,默认值为3

    b.majorcompact的条件检查

    通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

    通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默觉得0.2,

    也就是major的时间上下浮动4.8小时

    b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示须要major,

    b2.1>store下是否仅仅有一个文件,同一时候这个文件已经到了major的时间,

    b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间须要major,否则不做

    b2.2>文件个数大于1,到达major的时间,须要major


    Client端发起compactRegionrequest

    Client通过HBaseAdmin.compact发起regionserverrpc连接,调用regionserver.compactRegion

    假设传入的是tablename而不是regionname,会迭代出此table的全部region调用HRegionServer.compactRegion

    client发起。调用HRegionServer.compactRegion

    publicCompactRegionResponse compactRegion(finalRpcController controller,

    finalCompactRegionRequest request)throwsServiceException {

    try{

    checkOpen();

    requestCount.increment();

    onlineRegions中得到requestHregion实例

    HRegion region= getRegion(request.getRegion());

    region.startRegionOperation(Operation.COMPACT_REGION);

    LOG.info("Compacting" +region.getRegionNameAsString());

    booleanmajor =false;

    byte[] family =null;

    Storestore =null;

    假设client发起的request中传入有columnfamily的值,得到此cfHStore

    if(request.hasFamily()){

    family= request.getFamily().toByteArray();

    store= region.getStore(family);

    if(store ==null){

    thrownewServiceException(newIOException("columnfamily " + Bytes.toString(family)+

    "does not exist in region " +region.getRegionNameAsString()));

    }

    }

    检查是否是majorcompact请求

    if(request.hasMajor()){

    major= request.getMajor();

    }

    假设是发起majorcompaction的操作,

    if(major) {

    if(family !=null){

    store.triggerMajorCompaction();

    } else{

    region.triggerMajorCompaction();

    }

    }


    String familyLogMsg= (family!= null)?"for column family: " +Bytes.toString(family):"";

    LOG.trace("User-triggeredcompaction requested for region " +

    region.getRegionNameAsString()+ familyLogMsg);

    String log= "User-triggered "+ (major ?

    "major ": "")+ "compaction"+ familyLogMsg;

    否则是一般compation的请求,通过compactsplitThread.requestCompaction发起compactrequest

    if(family!= null){

    compactSplitThread.requestCompaction(region,store, log,

    Store.PRIORITY_USER,null);

    } else{

    compactSplitThread.requestCompaction(region,log,

    Store.PRIORITY_USER,null);

    }

    returnCompactRegionResponse.newBuilder().build();

    }catch(IOException ie){

    thrownewServiceException(ie);

    }

    }


    majorcompact处理流程

    requestCompaction无论是直接传入sotre或者是region的传入,

    假设传入的是region,那么会拿到region下的全部store,迭代调用每个storecompactionrequest操作。

    全部的非majorcompaction request终于会通过例如以下方法发起compactionrequest

    privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r,

    finalStore s,

    finalString why,intpriority,CompactionRequest request,booleanselectNow)


    针对storecompactionrequest处理流程

    假设要对一个HBASE的表禁用掉compaction操作,能够通过createtable时配置COMPACTION_ENABLED属性

    privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r, finalStore s,

    finalString why,intpriority,CompactionRequest request,booleanselectNow)

    throwsIOException {

    if(this.server.isStopped()

    || (r.getTableDesc()!= null&& !r.getTableDesc().isCompactionEnabled())){

    returnnull;

    }


    CompactionContextcompaction= null;


    此时的调用selectNowtrue,(假设是系统调用,此时的selectNowfalse,)

    也就是在发起requestCompactSplitThread.CompactionRunner线程运行时。

    假设是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例


    if(selectNow){

    通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

    并设置其request.priorityStore.PRIORITY_USER表示用户发起的request

    假设是flush时发起的compact

    并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

    表示系统发起的request,

    假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

    那么priority的值为PRIORITY_USER+1

    见生成CompactionRequest实例

    compaction= selectCompaction(r,s,priority,request);

    if(compaction== null)returnnull;// message logged inside

    }


    //We assume that most compactionsare small. So, put system compactionsinto small

    //pool; we will do selection there, and move to large pool ifnecessary.

    longsize =selectNow ?compaction.getRequest().getSize(): 0;


    此时好像一直就得不到largeCompactions的实例(system时通过CompactionRunner线程检查)

    由于selectNow==false时,size的大小为0

    不可能大于hbase.regionserver.thread.compaction.throttle配置的值

    此配置的默认值是hbase.hstore.compaction.max*2*memstoresize


    ThreadPoolExecutor pool= (!selectNow&& s.throttleCompaction(size))

    ?

    largeCompactions: smallCompactions;


    通过smallCompactions的线程池生成CompactionRunner线程并运行,见运行Compaction的处理线程


    pool.execute(newCompactionRunner(s,r,compaction,pool));

    if(LOG.isDebugEnabled()){

    String type= (pool ==smallCompactions)?

    "Small ": "Large ";

    LOG.debug(type+ "Compaction requested: "+ (selectNow? compaction.toString(): "system")

    + (why!= null&& !why.isEmpty()? "; Because: "+ why : "")+ "; "+ this);

    }

    returnselectNow ?compaction.getRequest(): null;

    }


    生成CompactionRequest实例

    Hstore.requestcompaction得到要进行compactstorefile,并生成一个CompactionContext

    publicCompactionContextrequestCompaction(intpriority, CompactionRequest baseRequest)

    throwsIOException {

    //don't even select for compaction if writes are disabled

    if(!this.areWritesEnabled()){

    returnnull;

    }

    生成一个DefaultStoreEngine.DefaultCompactionContext实例(假设storeEngine是默认的配置)

    CompactionContextcompaction= storeEngine.createCompaction();

    this.lock.readLock().lock();

    try{

    synchronized(filesCompacting){

    //First, see if coprocessorwould want to override selection.

    if(this.getCoprocessorHost()!= null){

    List<StoreFile>candidatesForCoproc= compaction.preSelect(this.filesCompacting);

    booleanoverride =this.getCoprocessorHost().preCompactSelection(

    this,candidatesForCoproc,baseRequest);

    if(override){

    //Coprocessoris overriding normal file selection.

    compaction.forceSelect(newCompactionRequest(candidatesForCoproc));

    }

    }


    //Normal case - coprocessoris not overriding file selection.

    if(!compaction.hasSelection()){

    假设是client端发起的compact,此时的值为true,假设是flush时发起的compact,此时的值为false


    booleanisUserCompaction= priority== Store.PRIORITY_USER;


    offPeakHours的值说明:

    1.通过hbase.offpeak.start.hour配置major的启动開始小时,如配置为1

    2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2

    假设启动时间是12配置的小时时间内,那么配置有这两个值后。

    主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默觉得10

    减去1个文件的总和的多少倍,

    如:有10个待做compact的文件,第一个文件(i=0)size=i+max(10)-1=9

    以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,假设超过了倍数,不做compact

    假设12配置为不等于-1,同一时候start小于end,当前做compact的时间刚好在此时间内。

    多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默觉得5.0f

    否则通过hbase.hstore.compaction.ratio配置得到,默觉得1.2f


    booleanmayUseOffPeak= offPeakHours.isOffPeakHour()&&

    offPeakCompactionTracker.compareAndSet(false,true);

    try{


    调用DefaultStoreEngine.DefaultCompactionContext实例的select方法。返回true/false,

    compaction.select的详细分析说明可參见majorcompact处理流程


    true表示有compactrequest,否则表示没有compactrequest

    此方法终于调用RatioBasedCompactionPolicy.selectCompaction方法,

    生成CompactRequest并放入到DefaultStoreEngine.DefaultCompactionContextrequest属性中

    得到要compactstorefile列表,放入到HStore.filesCompacting列表中

    方法传入的forceMajor实例仅仅有在发起majorcompact时同一时候fileCompacting列表中没有值时,此值为true,

    其他情况值都为false.就是最后一个參数的值为false

    a.compaction.select方法中得到此store中全部的storefile列表,

    传入到compactionPolicy.selectCompaction方法中。

    RatioBasedCompactionPolicy.selectCompaction方法处理流程:

    1.检查全部的storefile的个数减去正在做compactstorefile文件个数

    是否大于hbase.hstore.blockingStoreFiles配置的值。默觉得7,

    比对方法:

    a.假设filesCompacting(正在做compactstorefile列表)不为空

    那么storefiles的个数减去正在做compactstorefile文件个数加1是否大于blockingStoreFiles配置的值

    b.假设filesCompacting(正在做compactstorefile列表)为空

    那么storefiles的个数减去正在做compactstorefile文件个数是否大于blockingStoreFiles配置的值

    2.从全部的storefile列表中移出正在做compcatstorefile列表(fileCompacting列表中的数据)

    得到还没做(可选的)compactstorefiles列表

    3.假设columnfamily配置中的MIN_VERSIONS的值没有配置(=0)

    得到TTL配置的值(不配置=Integer.MAX_VALUE=-1)配置的值为秒为单位,否则得到Long.MAX_VALUE

    4.检查假设hbase.store.delete.expired.storefile配置的值为true(default=true),同一时候ttl非默认值

    2中得到的storefile列表中得到ttl超时的全部storefile列表。

    4.1假设有ttl过期的storefile,生成这些storefileCompactionRequest请求并返回

    4.2假设没有ttl过期的storefile,(控制大文件先不做小的compact)

    storefile列表中size超过hbase.hstore.compaction.max.size配置的storefile移出。默觉得Long.MAX_VALUE

    5.检查storefile是否须要做majorcompact操作,

    5.1得到通过hbase.hregion.majorcompaction配置的值默觉得1000*60*60*24*7

    5.2得到通过hbase.hregion.majorcompaction.jitter配置的值。默觉得0.5f

    5.3检查storefile中最先更新的storefile的更新时间是否在5.15.2配置的时间内(默认是3.5天到7天之间)

    假设配置为24小时,那么运行时间的加减为4.8个小时

    5.4假设还没有超过配置的时间,表示不须要发做majorcompact,

    5.5假设在时间范围内或超过此配置的时间。表示须要做majorcompact,

    a.同一时候假设仅仅有一个storefilestorefile的最小更新时间已经超过了ttl的配置时间,须要做majorcompact

    b.假设有多个storefile文件。表示须要做majorcompat.

    6.检查是否须要做compact另一个条件,在5成立的条件下。

    假设当前要做compactstorefile的个数小于hbase.hstore.compaction.max配置的值,默认10

    1. 56的检查条件都成立,或者此region(有个split操作。References文件),。表示升级为majorcompact

    2. 假设没有升级成majorcompact,把storefile列表中的blukloadfile移出

    3. 计算出最大的几个storefile,也就是filesize的值是后面几个文件的size的多少倍,

      把超过倍数的storefile移出。不做compact

      能够看上面对offPeakHours的值说明:


    10.假设如今还有须要做compcatstorefile列表,检查文件个数是否达到最小compact的配置的值。

    通过hbase.hstore.compaction.min配置,默觉得3,老版本号通过hbase.hstore.compactionThreshold配置

    假设没有达到最小的配置值。不做compact

    11.假设没有升级到major。把超过hbase.hstore.compaction.max配置的storefile移出列表。默认配置为10


    12.生成并返回一个CompactionRequest的实例。假设非major,同一时候在offPeakHours的值说明的时间内,

    CompactionRequestisOffPeak设置为true,否则设置为false(major)


    compaction.select(this.filesCompacting,isUserCompaction,

    mayUseOffPeak,forceMajor&& filesCompacting.isEmpty());

    } catch(IOException e){

    if(mayUseOffPeak){

    offPeakCompactionTracker.set(false);

    }

    throwe;

    }

    assertcompaction.hasSelection();

    if(mayUseOffPeak&& !compaction.getRequest().isOffPeak()){

    //Compaction policy doesn't want to take advantage of off-peak.

    offPeakCompactionTracker.set(false);

    }

    }

    if(this.getCoprocessorHost()!= null){

    this.getCoprocessorHost().postCompactSelection(

    this,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);

    }


    //Selected files; see if we have a compaction with some custom baserequest.

    if(baseRequest!= null){

    //Update the request with what the system thinks the request should be;

    //its up to the request if it wants to listen.

    compaction.forceSelect(

    baseRequest.combineWith(compaction.getRequest()));

    }


    //Finally, we have the resulting files list. Check if we have any filesat all.

    finalCollection<StoreFile>selectedFiles= compaction.getRequest().getFiles();

    if(selectedFiles.isEmpty()){

    returnnull;

    }


    //Update filesCompacting (check that we do not try to compact the sameStoreFile twice).

    if(!Collections.disjoint(filesCompacting,selectedFiles)){

    Preconditions.checkArgument(false,"%s overlaps with %s",

    selectedFiles,filesCompacting);

    }

    把当前要运行compactstorefile列表加入到HStore.filesCompacting中。

    filesCompacting.addAll(selectedFiles);

    通过storefileseqid按从小到大排序

    Collections.sort(filesCompacting,StoreFile.Comparators.SEQ_ID);


    //If we're enqueuinga major, clear the force flag.


    假设当前要做compact的文件个数等待当前sotre中全部的storefile个数,把当前的compact提升为major


    booleanisMajor =selectedFiles.size()== this.getStorefilesCount();

    this.forceMajor= this.forceMajor&& !isMajor;


    //Set common request properties.

    //Set priority, either override value supplied by caller or from store.

    compaction.getRequest().setPriority(

    (priority!= Store.NO_PRIORITY)? priority: getCompactPriority());

    compaction.getRequest().setIsMajor(isMajor);

    compaction.getRequest().setDescription(

    getRegionInfo().getRegionNameAsString(),getColumnFamilyName());

    }

    }finally{

    this.lock.readLock().unlock();

    }


    LOG.debug(getRegionInfo().getEncodedName()+ " - "+ getColumnFamilyName()+ ": Initiating "

    +(compaction.getRequest().isMajor()? "major": "minor")+ " compaction");

    this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());

    returncompaction;

    }


    运行Compaction的处理流程

    compact运行时是通过指定的线程池生成并运行CompactSplitThread.CompactionRunner线程

    下面是线程运行的详细说明:

    publicvoid run(){

    Preconditions.checkNotNull(server);

    if(server.isStopped()

    || (region.getTableDesc()!= null&& !region.getTableDesc().isCompactionEnabled())){

    return;

    }

    //Common case - system compaction without a file selection. Select now.

    假设compaction==null表示是systemcompact非用户发起的compaction得到一个compactionContext


    if(this.compaction== null){


    queuedPriority的值在此线程实例生成时默认是hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

    假设相减的值是1时返回2,否则返回相减的值


    intoldPriority = this.queuedPriority;


    又一次拿到hbase.hstore.blockingStoreFiles配置的值减去storefile的个数的值。


    this.queuedPriority= this.store.getCompactPriority();


    假设这次拿到的值比上次的值要大,表示有storefile被删除(基本上是有compact完毕)


    if(this.queuedPriority> oldPriority){

    //Store priority decreased while we were in queue (due to some othercompaction?),

    //requeuewith new priority to avoid blocking potential higher priorities.


    结束本次线程调用。发起一个新的线程调用,用最新的priority


    this.parent.execute(this);

    return;

    }

    try{


    通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

    并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

    表示系统发起的request,

    假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

    那么priority的值为PRIORITY_USER+1

    假设是client时发起的compact,此处会设置其request.priorityStore.PRIORITY_USER表示是用户发起的request

    见生成CompactionRequest实例


    this.compaction= selectCompaction(this.region,this.store,queuedPriority,null);

    } catch(IOException ex){

    LOG.error("Compactionselection failed " + this,ex);

    server.checkFileSystem();

    return;

    }

    if(this.compaction== null)return;// nothing to do

    //Now see if we are in correct pool for the size; if not, go to thecorrect one.

    //We might end up waiting for a while, so cancel the selection.

    assertthis.compaction.hasSelection();

    此处检查上面提到无用的地方:

    compaction.getRequest().getSize()的大小为全部当此要做compactstorefile的总大小

    检查是否大于hbase.regionserver.thread.compaction.throttle配置的值

    此配置的默认值是hbase.hstore.compaction.max*2*memstoresize

    假设大于指定的值,使用largeCompactions,否则使用smallCompactions


    ThreadPoolExecutor pool= store.throttleCompaction(

    compaction.getRequest().getSize())?

    largeCompactions: smallCompactions;

    假设发现当前又一次生成的运行线程池不是上次选择的线程池,结束compaction操作,

    并又一次通过新的线程池运行当前线程,结束当前线程的调用运行

    if(this.parent!= pool) {

    this.store.cancelRequestedCompaction(this.compaction);

    this.compaction= null;

    this.parent= pool;

    this.parent.execute(this);

    return;

    }

    }

    //Finally we can compact something.

    assertthis.compaction!= null;


    this.compaction.getRequest().beforeExecute();

    try{

    //Note: please don't put single-compaction logic here;

    // put it into region/store/etc. This is CST logic.

    longstart =EnvironmentEdgeManager.currentTimeMillis();

    调用HRegion.compact方法,此方法调用HStore.compact方法,把CompactionContext传入

    此方法调用返回compact是否成功。假设成功返回true,否则返回false

    booleancompleted =region.compact(compaction,store);

    longnow =EnvironmentEdgeManager.currentTimeMillis();

    LOG.info(((completed)? "Completed": "Aborted")+ " compaction: "+

    this+ "; duration="+ StringUtils.formatTimeDiff(now,start));

    if(completed){


    检查此时的storefile个数是否还大于hbase.hstore.blockingStoreFiles配置的值,默觉得7

    如要大于或等于此时返回的值为小于或等于0的值,表示还须要进行compact操作,又一次再发起一次compactrequest

    //degenerate case: blocked regions require recursive enqueues

    if(store.getCompactPriority()<= 0) {

    requestSystemCompaction(region,store,"Recursive enqueue");

    } else{

    此时表示compact操作完毕后,storefile的个数在配置的范围内,不须要在做compact

    检查是否须要split,假设须要发起split操作。

    Split的发起条件:

    a.splitlimit,hbase.regionserver.regionSplitLimit配置的值大于当前rs中的allonlineregions

    默觉得integer.maxvalue

    b.a检查通过的同一时候hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

    大于等于Store.PRIORITY_USER(1)

    c.metanamespace表。同一时候其他条件见split的分析部分

    //see if the compaction has caused us to exceed max region size

    requestSplit(region);

    }

    }

    } catch(IOException ex){

    IOException remoteEx= RemoteExceptionHandler.checkIOException(ex);

    LOG.error("Compactionfailed " + this,remoteEx);

    if(remoteEx!= ex) {

    LOG.info("Compactionfailed at original callstack: " +formatStackTrace(ex));

    }

    server.checkFileSystem();

    } catch(Exception ex){

    LOG.error("Compactionfailed " + this,ex);

    server.checkFileSystem();

    } finally{

    LOG.debug("CompactSplitThreadStatus: " +CompactSplitThread.this);

    }

    this.compaction.getRequest().afterExecute();

    }


    Hstore.compact方法流程:


    publicList<StoreFile>compact(CompactionContextcompaction) throwsIOException {

    assertcompaction!= null&& compaction.hasSelection();

    CompactionRequest cr= compaction.getRequest();

    得到要做compactstorefile列表

    Collection<StoreFile>filesToCompact= cr.getFiles();

    assert!filesToCompact.isEmpty();

    synchronized(filesCompacting){

    //sanity check: we're compacting files that this store knows about

    //TODO:change this to LOG.error() after more debugging

    Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));

    }


    //Ready to go. Have list of files to compact.

    LOG.info("Startingcompaction of " +filesToCompact.size()+ " file(s) in "

    + this+ " of "+ this.getRegionInfo().getRegionNameAsString()

    + "into tmpdir=" + fs.getTempDir()+ ", totalSize="

    +StringUtils.humanReadableInt(cr.getSize()));


    longcompactionStartTime= EnvironmentEdgeManager.currentTimeMillis();

    List<StoreFile>sfs = null;

    try{

    运行compact操作,把全部的storefile全并成一个storefile。放入到store/.tmp文件夹下

    通过DefaultCompactor.compact操作,把原有的全部storefile生成一个StoreFileScanner列表,

    并生成一个StoreScannerStoreFileScanner列表增加。

    假设compact提升成了major,ScanType=COMPACT_DROP_DELETES,否则等于COMPACT_RETAIN_DELETES

    针对compact的数据scan可參见后期分析的scan流程

    //Commence the compaction.

    List<Path>newFiles =compaction.compact();


    假设hbase.hstore.compaction.complete设置为false,检查storefile生成是否可用

    //TODO:get rid of this!

    if(!this.conf.getBoolean("hbase.hstore.compaction.complete",true)){

    LOG.warn("hbase.hstore.compaction.completeis set to false");

    sfs= newArrayList<StoreFile>();

    for(Path newFile: newFiles){

    //Create storefilearound what we wrote with a reader on it.

    StoreFile sf= createStoreFileAndReader(newFile);

    sf.closeReader(true);

    sfs.add(sf);

    }

    returnsfs;

    }

    把生成的新的storefile加入到cf的文件夹下。并返回生成后的storefile,此storefile已经生成好reader

    //Do the steps necessary to complete the compaction.

    sfs= moveCompatedFilesIntoPlace(cr,newFiles);


    生成一个compaction的说明信息,写入到wal日志中

    writeCompactionWalRecord(filesToCompact,sfs);


    把原有的storefile列表中store中的storefiles列表中移出,

    并把新的storefile加入到storefiles列表中。对storefiles列表又一次排序,通过storefile.seqid

    storefiles列表是scan操作时对store中的查询用的storefilereader

    HStore.filesCompacting列表中移出完毕compactstorefiles列表

    replaceStoreFiles(filesToCompact,sfs);


    hdfs中此store下移出compact完毕的storefile文件列表。

    //At this point the store will use new files for all new scanners.

    completeCompaction(filesToCompact);// Archive old files & update storesize.

    }finally{

    HStore.filesCompacting列表中移出完毕compactstorefiles列表,假设compact完毕此时没有要移出的文件

    假设compact失败,此时把没有compact的文件移出

    finishCompactionRequest(cr);

    }

    logCompactionEndMessage(cr,sfs,compactionStartTime);

    returnsfs;

    }



    majorcompact处理流程

    majorCompaction无论是直接传入sotre或者是region的传入。

    假设传入的是region,那么会拿到region下的全部store,迭代调用每个storetriggerMajorCompaction操作。

    Hstore.triggerMajorCompaction操作流程:设置store中的forcemajor的值为true

    publicvoid triggerMajorCompaction(){

    this.forceMajor= true;

    }


    设置完毕forceMajor的值后,终于还是直接触发requestCompaction方法

    if(family!= null) {

    compactSplitThread.requestCompaction(region,store, log,

    Store.PRIORITY_USER,null);

    } else{

    compactSplitThread.requestCompaction(region,log,

    Store.PRIORITY_USER,null);

    }

    requestCompaction的处理流程大至与非majorcoompact处理流程无差别:

    CompactSplitThread.requestCompaction-->requestCompactionInternal-->selectCompaction

    -->Hstore.requestCompaction(priority,request)得到compactionContext

    代码细节例如以下所看到的:

    是否是用户发起的compaction操作


    booleanisUserCompaction= priority== Store.PRIORITY_USER;


    下面代码返回为true的条件:

    a.hbase.offpeak.start.hour的值不等于-1(0-23之间的值)

    b.hbase.offpeak.end.hour的值不等-1(0-23之间的值),同一时候此值大于a配置的值

    c.当前时间的小时部分在ab配置的时间之间

    否则返回的值为false


    booleanmayUseOffPeak= offPeakHours.isOffPeakHour()&&

    offPeakCompactionTracker.compareAndSet(false,true);

    try{


    此时最后一个參数为true(在没有其他的compact操作的情况下,同一时候指定的compact模式为major),


    compaction.select(this.filesCompacting,isUserCompaction,

    mayUseOffPeak,forceMajor&& filesCompacting.isEmpty());

    } catch(IOException e){

    if(mayUseOffPeak){

    offPeakCompactionTracker.set(false);

    }

    throwe;

    }


    以上代码的中的compaction.select默认调用为DefaultStoreEngine.DefaultCompactionContext.select方法


    publicbooleanselect(List<StoreFile>filesCompacting,booleanisUserCompaction,

    booleanmayUseOffPeak,booleanforceMajor)throwsIOException {


    调用RatioBasedCompactionPolicy.selectCompaction得到一个CompactionRequest

    并把此request设置到当前compaction实例的request属性中


    request= compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),

    filesCompacting,isUserCompaction,mayUseOffPeak,forceMajor);

    returnrequest!= null;

    }


    RatioBasedCompactionPolicy.selectCompaction处理流程说明:


    publicCompactionRequest selectCompaction(Collection<StoreFile>candidateFiles,

    finalList<StoreFile>filesCompacting,finalbooleanisUserCompaction,

    finalbooleanmayUseOffPeak,finalbooleanforceMajor)throwsIOException {

    //Preliminary compaction subject to filters

    ArrayList<StoreFile>candidateSelection= newArrayList<StoreFile>(candidateFiles);

    //Stuck and not compacting enough (estimate). It is not guaranteed thatwe will be

    //able to compact more if stuck and compacting, because ratio policyexcludes some

    //non-compacting files from consideration during compaction (seegetCurrentEligibleFiles).

    intfutureFiles= filesCompacting.isEmpty()? 0 : 1;


    store下全部的storefile的个数减去当前已经在做compact的个数是否大于blockingfile的配置个数

    blockingfile通过hbase.hstore.blockingStoreFiles配置,默觉得7


    booleanmayBeStuck= (candidateFiles.size()- filesCompacting.size()+ futureFiles)

    >=storeConfigInfo.getBlockingFileCount();


    得到可选择的storefile,也就是得到全部的storefile中不包括正在做compactsotrefile的列表


    candidateSelection= getCurrentEligibleFiles(candidateSelection,filesCompacting);

    LOG.debug("Selectingcompaction from " +candidateFiles.size()+ " store files, "+

    filesCompacting.size()+ " compacting, "+ candidateSelection.size()+

    "eligible, " +storeConfigInfo.getBlockingFileCount()+ " blocking");


    得到配置的ttl过期时间,通过在cf的表属性中配置TTL属性,

    假设配置值为Integer.MAX_VALUE或者-1或者不配置,表示不控制ttl,

    TTL属性生效的前提是MIN_VERSIONS属性不配置,TTL属性配置单位为秒

    假设以上条件检查通过表示有配置ttl,返回ttl的配置时间,否则返回Long.maxvalue


    longcfTtl =this.storeConfigInfo.getStoreFileTtl();


    假设不是发起的major操作。

    同一时候配置有ttl过期时间。同一时候hbase.store.delete.expired.storefile配置的值为true,默觉得true

    同一时候ttl属性有配置,

    得到当前未做compact操作的全部sotrefilettl过期的storefile

    假设有ttl过期的storefile文件。生成CompactionRequest实例,并结束此流程处理


    if(!forceMajor){

    //If there are expired files, only select them so that compactiondeletes them

    if(comConf.shouldDeleteExpired()&& (cfTtl!= Long.MAX_VALUE)){

    ArrayList<StoreFile>expiredSelection= selectExpiredStoreFiles(

    candidateSelection,EnvironmentEdgeManager.currentTimeMillis()- cfTtl);

    if(expiredSelection!= null){

    returnnewCompactionRequest(expiredSelection);

    }

    }


    假设非majorstorefile中非reference(split后的文件为reference文件)storefile文件。

    同一时候storefile的大小超过了hbase.hstore.compaction.max.size配置的最大storefile文件限制大小

    移出这些文件


    candidateSelection= skipLargeFiles(candidateSelection);

    }


    //Force a major compaction if this is a user-requested majorcompaction,

    //or if we do not have too many files to compact and this was requested

    //as a major compaction.

    //Or, if there are any references among the candidates.



    此处检查major的条件包括下面几个:


    (forceMajor&& isUserCompaction)


    a.假设是用户发起的compaction,同一时候用户发起的compactionmajorcompact,

    同一时候store中没有其他正在做compactstorefile,此值为true


    ((forceMajor|| isMajorCompaction(candidateSelection))

    &&(candidateSelection.size()< comConf.getMaxFilesToCompact()))


    b.检查上面看到代码的3个条件,第一个(b1)与第二个(b2)为一个通过即可,第三个(b3)必须通过


    forceMajor


    b1.假设是发起的compaction,同一时候store中没有其他正在做compactstorefile


    isMajorCompaction(candidateSelection)


    b2.或者下面几个条件检查通过:

    b2.1.可选的storefile列表中改动时间最老的一个storefile的时间达到了间隔的majorcompact时间

    b2.2.假设可选的storefile列表中仅仅有一个storefile,同一时候此storefile的最老的一条数据的时间已经达到ttl时间

    同一时候此storefile的时间达到了间隔的major时间间隔

    b2.3.假设可选的storefile列表中有多少storefile。同一时候更新时间最老的一个storefile达到了major的时间间隔

    b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间。

    可是可选的storefile个数仅仅有一个。同一时候此storefile已经做过major(StoreFile.majorCompaction==true)

    同一时候ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

    通过hbase.hregion.majorcompaction配置major的间隔时间,

    通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

    如:major的配置时间为24小时,同一时候间隔的左右差是0.2f,那么default= 20% = +/- 4.8 hrs


    (candidateSelection.size()< comConf.getMaxFilesToCompact())


    b3.可选的storefile列表的个数小于compactmaxfiles的配置个数,

    通过hbase.hstore.compaction.max配置。默认值为10


    StoreUtils.hasReferences(candidateSelection)


    c.假设storefile列表中包括有reference(split后的文件为reference文件)storefile


    booleanmajorCompaction= (

    (forceMajor&& isUserCompaction)

    || ((forceMajor|| isMajorCompaction(candidateSelection))

    &&(candidateSelection.size()< comConf.getMaxFilesToCompact()))

    ||StoreUtils.hasReferences(candidateSelection)

    );

    假设是非majorcompact

    if(!majorCompaction){

    //we're doing a minor compaction, let's see what files are applicable

    从可选的storefile列表中移出是bulkloadstorefile


    candidateSelection= filterBulk(candidateSelection);


    假设可选的storefile列表中的个数大于或等于hbase.hstore.compaction.max配置的值,

    移出可选的storefile列表中最大的几个storefile,

    通过例如以下说明来计算什么文件算是较大的storefile:

    a.storefile的文件大小是后面几个文件的总和的多少倍数,倍数的说明在例如以下几行中进行了说明,

    1.通过hbase.offpeak.start.hour配置major的启动開始小时。如配置为1

    2.通过hbase.offpeak.end.hour配置major的启动结束小时。如配置为2

    假设启动时间是12配置的小时时间内,那么配置有这两个值后,

    主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默觉得10

    减去1个文件的总和的多少倍,

    如:有10个待做compact的文件,第一个文件(i=0)size=i+max(10)-1=9

    以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,假设超过了倍数,不做compact

    假设12配置为不等于-1,同一时候start小于end,当前做compact的时间刚好在此时间内,

    多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默觉得5.0f

    否则通过hbase.hstore.compaction.ratio配置得到,默觉得1.2f

    b.storefile的大小必须是大于hbase.hstore.compaction.min.size配置的值。默认是memstore的大小

    c.假设如今全部的storefile的个数减去正在做compactstorefile个数大于

    通过hbase.hstore.blockingStoreFiles配置的值,默觉得7,移出最大的几个storefile

    仅仅保留通过hbase.hstore.compaction.min配置的个数,默觉得3(配置不能小于2)

    老版本号通过hbase.hstore.compactionThreshold配置


    candidateSelection= applyCompactionPolicy(candidateSelection,mayUseOffPeak,mayBeStuck);


    检查可选的能做compact的文件个数是否达到最少文件要求。假设没有达到,清空全部可选的storefile列表值


    candidateSelection= checkMinFilesCriteria(candidateSelection);

    }

    假设不是用户发起的majorcompact。移出可选的storefile列表中超过hbase.hstore.compaction.max配置的个数

    candidateSelection= removeExcessFiles(candidateSelection,isUserCompaction,majorCompaction);

    生成CompactionRequest实例

    CompactionRequest result= newCompactionRequest(candidateSelection);

    假设非major同一时候offpeak有配置,同一时候当前时间在配置的时间范围内,设置CompactionRequestoffpeaktrue

    表示当前时间是非高峰时间内

    result.setOffPeak(!candidateSelection.isEmpty()&& !majorCompaction&& mayUseOffPeak);

    returnresult;

    }


    运行compaction的详细处理。见非majorcompaction处理流程中的运行compaction处理流程


    flush时的compaction

    flush时的compaction通过MemStoreFlusher.FlusherHander.run运行

    flushRegion完毕后,会触发compact的运行


    CompactSplitThread.requestSystemCompaction-->requestCompactionInternal(region)

    publicsynchronized voidrequestSystemCompaction(

    finalHRegion r,finalString why)throwsIOException {

    requestCompactionInternal(r,why,Store.NO_PRIORITY,null,false);

    }


    CompactSplitThread.requestCompactionInternal(Region)-->requestCompactionInternal(Store)

    privateList<CompactionRequest>requestCompactionInternal(finalHRegion r, finalString why,

    intp,List<Pair<CompactionRequest,Store>>requests,booleanselectNow)throwsIOException {

    //not a special compaction request, so make our own list

    List<CompactionRequest>ret = null;

    if(requests== null){

    ret= selectNow? newArrayList<CompactionRequest>(r.getStores().size()): null;

    for(Stores :r.getStores().values()){


    迭代发起针对storecompaction操作,传入的priority=Store.NO_PRIORITY,可參见非majorcompact处理流程


    CompactionRequest cr= requestCompactionInternal(r,s, why,p, null,selectNow);

    if(selectNow)ret.add(cr);

    }

    }else{

    Preconditions.checkArgument(selectNow);// only system requests have selectNow== false

    ret= newArrayList<CompactionRequest>(requests.size());

    for(Pair<CompactionRequest,Store>pair :requests) {

    ret.add(requestCompaction(r,pair.getSecond(),why, p,pair.getFirst()));

    }

    }

    returnret;

    }



    定时线程运行的compact流程

    定期线程运行通过HRegionServer.CompactionChecker运行,

    CompactionChecker线程主要作用:

    生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否须要compact的检查线程,

    假设须要进行compact,会在此处通过compact的线程触发compcat的请求

    此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

    假设majorcompact的优先级大过此值,compact的优先级设置为此值.

    Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间须要进行compact检查的间隔

    默觉得1000ms,

    compactionChecker的检查周期为wakefrequency*multiplierms,

    也就是默认情况下线程调用1000次运行一次compact检查

    a.compaction检查时发起compact的条件是

    假设一个store中全部的file个数减去在做(或发起compact请求)的个数,大于或等于

    hbase.hstore.compaction.min配置的值,

    老版本号使用hbase.hstore.compactionThreshold进行配置,默认值为3

    b.majorcompact的条件检查

    通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

    通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默觉得0.2,

    也就是major的时间上下浮动4.8小时

    b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示须要major,

    b2.1>store下是否仅仅有一个文件,同一时候这个文件已经到了major的时间,

    b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间须要major,否则不做

    b2.2>文件个数大于1,到达major的时间,须要major


    protectedvoid chore(){

    for(HRegion r: this.instance.onlineRegions.values()){

    if(r == null)

    continue;

    for(Stores :r.getStores().values()){

    try{

    longmultiplier= s.getCompactionCheckMultiplier();

    assertmultiplier> 0;

    if(iteration% multiplier!= 0) continue;

    检查是否须要systemcompact,当前全部的storefile个数减去正在做compactstorefile个数,

    大于或等于hbase.hstore.compaction.min配置的值。表示须要compact

    if(s.needsCompaction()){

    //Queue a compaction. Will recognize if major is needed.

    发起系统的compact操作。flush时的coompaction

    this.instance.compactSplitThread.requestSystemCompaction(r,s,getName()

                • "requests compaction");


    b2.或者下面几个条件检查通过:

    b2.1.可选的storefile列表中改动时间最老的一个storefile的时间达到了间隔的majorcompact时间

    b2.2.假设可选的storefile列表中仅仅有一个storefile,同一时候此storefile的最老的一条数据的时间已经达到ttl时间

    同一时候此storefile的时间达到了间隔的major时间间隔

    b2.3.假设可选的storefile列表中有多少storefile,同一时候更新时间最老的一个storefile达到了major的时间间隔

    b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,

    可是可选的storefile个数仅仅有一个,同一时候此storefile已经做过major(StoreFile.majorCompaction==true)

    同一时候ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

    通过hbase.hregion.majorcompaction配置major的间隔时间。

    通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

    如:major的配置时间为24小时,同一时候间隔的左右差是0.2f,那么default= 20% = +/- 4.8 hrs


    } elseif(s.isMajorCompaction()){

    if(majorCompactPriority== DEFAULT_PRIORITY

    ||majorCompactPriority> r.getCompactPriority()){

    发起requestCompaction操作,见以下说明A

    this.instance.compactSplitThread.requestCompaction(r,s,getName()

    + "requests major compaction; use default priority",null);

    } else{

    发起requestCompaction操作,见以下说明B

    this.instance.compactSplitThread.requestCompaction(r,s,getName()

    + "requests major compaction; use configured priority",

    this.majorCompactPriority,null);

    }

    }

    } catch(IOException e){

    LOG.warn("Failedmajor compaction check on " + r,e);

    }

    }

    }

    iteration= (iteration== Long.MAX_VALUE)?

    0 : (iteration+ 1);

    }

    }


    说明A:

    CompactSplitThread.requestCompaction-->

    requestCompaction(r,s, why,Store.NO_PRIORITY,request);

    -->requestCompactionInternal(r,s, why,priority,request,true);此时设置selectNowtrue


    说明B:

    CompactSplitThread.requestCompaction-->

    requestCompactionInternal(r,s, why,priority,request,true);此时设置selectNowtrue


    -------------------------------------------------------------

    requestCompactionInternal处理流程:


    privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r,

    finalStore s,

    finalString why,intpriority,CompactionRequest request,booleanselectNow)


    针对storecompactionrequest处理流程

    假设要对一个HBASE的表禁用掉compaction操作,能够通过createtable时配置COMPACTION_ENABLED属性

    privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion r, finalStore s,

    finalString why,intpriority,CompactionRequest request,booleanselectNow)

    throwsIOException {

    if(this.server.isStopped()

    || (r.getTableDesc()!= null&& !r.getTableDesc().isCompactionEnabled())){

    returnnull;

    }


    CompactionContextcompaction= null;


    此时的调用selectNowtrue,(假设是系统调用,此时的selectNowfalse,)

    也就是在发起requestCompactSplitThread.CompactionRunner线程运行时,

    假设是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例


    if(selectNow){

    通过HStore.requestCompaction得到一个compactionContext,计算要进行compactstorefile

    并设置其request.priorityStore.PRIORITY_USER表示用户发起的request

    假设是flush时发起的compact

    并设置其request.priorityhbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

    表示系统发起的request,

    假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

    那么priority的值为PRIORITY_USER+1

    见生成CompactionRequest实例

    compaction= selectCompaction(r,s,priority,request);

    if(compaction== null)returnnull;// message logged inside

    }


    //We assume that most compactionsare small. So, put system compactionsinto small

    //pool; we will do selection there, and move to large pool ifnecessary.

    longsize =selectNow ?

    compaction.getRequest().getSize(): 0;


    此时好像一直就得不到largeCompactions的实例,由于selectNow==false时,size的大小为0

    不可能大于hbase.regionserver.thread.compaction.throttle配置的值

    此配置的默认值是hbase.hstore.compaction.max*2*memstoresize


    ThreadPoolExecutor pool= (!selectNow&& s.throttleCompaction(size))

    ? largeCompactions: smallCompactions;


    通过smallCompactions的线程池生成CompactionRunner线程并运行,见运行Compaction的处理线程


    pool.execute(newCompactionRunner(s,r,compaction,pool));

    if(LOG.isDebugEnabled()){

    String type= (pool ==smallCompactions)? "Small ": "Large ";

    LOG.debug(type+ "Compaction requested: "+ (selectNow? compaction.toString(): "system")

    + (why!= null&& !why.isEmpty()? "; Because: "+ why : "")+ "; "+ this);

    }

    returnselectNow ?

    compaction.getRequest(): null;

    }



  • 相关阅读:
    简单的本地注册与登陆的功能
    Android 简单统计文本文件的字符数、单词数、行数、语句数Demo
    个人项目需求与分析——点菜系统App
    强大的健身软件——Keep
    大学生三大痛点
    我推荐的一款实用APP-圣才电子书
    PropertyDescriptor和BeanUtils使用去获得类的get 和 setter访问器
    神奇的 Object.defineProperty
    用cudamat做矩阵运算的GPU加速
    Java并发编程概要
  • 原文地址:https://www.cnblogs.com/lytwajue/p/7100055.html
Copyright © 2011-2022 走看看