zoukankan      html  css  js  c++  java
  • UserScan的处理流程分析

    UserScan的处理流程分析

    前置说明

    Userscan是通过clientcp中发起的scanner操作。

    Scan中通过caching属性来返回能够返回多少条数据。每次进行next时。

    通过batch属性来设置每次在rs端每次nextkv时,可读取多少个kv(在同一行的情况下)

    在生成Scan实例时。最好是把familycolumn都设置上,这样能保证查询的最高效.

    client端通过生成Scan实例,通过HTable下的例如以下方法得到ClientScanner实例

    publicResultScannergetScanner(finalScan scan)

    在生成的ClientScanner实例中的callable属性的值为生成的一个ScannerCallable实例。

    并通过callable.prepare(tries!= 0);方法得到此scanstartkey所在的regionlocation.meta表中。

    startkey相应的location中得到此locationHRegionInfo信息。

    并设置ClientScanner.currentRegion的值为当前的region.也就是startkey所在的region.


    通过ClientScanner.nextrs发起rpc调用操作。

    调用HRegionServer.scan

    publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)



    ClientScanner.next时,首先是发起openScanner操作,得到一个ScannerId

    通过ScannerCallable.call方法:

    if(scannerId== -1L) {

    this.scannerId= openScanner();

    } else{

    openScanner方法:中发起一个scan操作,通过rpc调用rs.scan

    ScanRequest request=

    RequestConverter.buildScanRequest(

    getLocation().getRegionInfo().getRegionName(),

    this.scan,0, false);

    try{

    ScanResponse response= getStub().scan(null,request);

    longid =response.getScannerId();

    if(logScannerActivity){

    LOG.info("Openscanner=" + id+ " for scan="+ scan.toString()

    + "on region " +getLocation().toString());

    }

    returnid;


    HregionServer.scan中对openScanner的处理:

    publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)

    throwsServiceException {

    Leases.Lease lease= null;

    String scannerName= null;

    ........................................非常多代码没有显示

    requestCount.increment();


    intttl = 0;

    HRegion region= null;

    RegionScannerscanner =null;

    RegionScannerHolder rsh= null;

    booleanmoreResults= true;

    booleancloseScanner= false;

    ScanResponse.Builder builder= ScanResponse.newBuilder();

    if(request.hasCloseScanner()){

    closeScanner= request.getCloseScanner();

    }

    introws = 1;

    if(request.hasNumberOfRows()){

    rows= request.getNumberOfRows();

    }

    if(request.hasScannerId()){

    .................................非常多代码没有显示

    } else{

    得到请求的HRegion实例,也就是startkey所在的HRegion

    region= getRegion(request.getRegion());

    ClientProtos.Scan protoScan= request.getScan();

    booleanisLoadingCfsOnDemandSet= protoScan.hasLoadColumnFamiliesOnDemand();

    Scan scan= ProtobufUtil.toScan(protoScan);

    //if the request doesn't set this, get the default region setting.

    if(!isLoadingCfsOnDemandSet){

    scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());

    }

    scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);

    假设scan没有设置family,region中全部的family当成scanfamily

    region.prepareScanner(scan);

    if(region.getCoprocessorHost()!= null){

    scanner= region.getCoprocessorHost().preScannerOpen(scan);

    }

    if(scanner ==null){

    运行HRegion.getScanner方法。生成HRegion.RegionScannerImpl方法

    scanner= region.getScanner(scan);

    }

    if(region.getCoprocessorHost()!= null){

    scanner= region.getCoprocessorHost().postScannerOpen(scan,scanner);

    }

    把生成的RegionScanner加入到scanners集合容器中。

    并设置scannerid(一个随机的值),

    scannernamescanneridstring版本号。加入过期监控处理,

    通过hbase.client.scanner.timeout.period配置过期时间,默认值为60000ms

    老版本号通过hbase.regionserver.lease.period配置。

    过期检查线程通过Leases完毕。

    scanner的过期处理通过一个

    HregionServer.ScannerListener.leaseExpired实例来完毕。


    scannerId= addScanner(scanner,region);

    scannerName= String.valueOf(scannerId);

    ttl= this.scannerLeaseTimeoutPeriod;

    }

    ............................................非常多代码没有显示


    Hregion.getScanner方法生成RegionScanner实例流程


    publicRegionScannergetScanner(Scanscan)throwsIOException {

    returngetScanner(scan,null);

    }


    层次的调用,此时传入的kvscannerlistnull

    protectedRegionScannergetScanner(Scanscan,

    List<KeyValueScanner>additionalScanners)throwsIOException {

    startRegionOperation(Operation.SCAN);

    try{

    //Verify families are all valid

    prepareScanner(scan);

    if(scan.hasFamilies()){

    for(byte[] family :scan.getFamilyMap().keySet()){

    checkFamily(family);

    }

    }

    returninstantiateRegionScanner(scan,additionalScanners);

    }finally{

    closeRegionOperation();

    }

    }


    终于生成一个HRegion.RegionScannerImpl实例

    protectedRegionScannerinstantiateRegionScanner(Scanscan,

    List<KeyValueScanner>additionalScanners)throwsIOException {

    returnnewRegionScannerImpl(scan,additionalScanners,this);

    }


    RegionScanner实例的生成构造方法:

    RegionScannerImpl(Scanscan,List<KeyValueScanner>additionalScanners,HRegion region)

    throwsIOException {


    this.region= region;

    this.maxResultSize= scan.getMaxResultSize();

    if(scan.hasFilter()){

    this.filter= newFilterWrapper(scan.getFilter());

    } else{

    this.filter= null;

    }


    this.batch= scan.getBatch();

    if(Bytes.equals(scan.getStopRow(),HConstants.EMPTY_END_ROW)&& !scan.isGetScan()){

    this.stopRow= null;

    } else{

    this.stopRow= scan.getStopRow();

    }

    //If we are doing a get, we want to be [startRow,endRow] normally

    //it is [startRow,endRow) and if startRow=endRow we get nothing.

    this.isScan= scan.isGetScan()?

    -1 : 0;


    //synchronize on scannerReadPoints so that nobody calculates

    //getSmallestReadPoint, before scannerReadPoints is updated.

    IsolationLevelisolationLevel= scan.getIsolationLevel();

    synchronized(scannerReadPoints){

    if(isolationLevel== IsolationLevel.READ_UNCOMMITTED){

    //This scan can read even uncommitted transactions

    this.readPt= Long.MAX_VALUE;

    MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);

    } else{

    this.readPt= MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);

    }

    scannerReadPoints.put(this,this.readPt);

    }


    //Here we separate all scanners into two lists - scanner that providedata required

    //by the filter to operate (scanners list) and all others(joinedScanners list).

    List<KeyValueScanner>scanners =newArrayList<KeyValueScanner>();

    List<KeyValueScanner>joinedScanners= newArrayList<KeyValueScanner>();

    if(additionalScanners!= null){

    scanners.addAll(additionalScanners);

    }

    迭代每个要进行scanstore。生成详细的StoreScanner实例。

    通常情况下joinedHead的值为null

    for(Map.Entry<byte[],NavigableSet<byte[]>>entry :

    scan.getFamilyMap().entrySet()){

    Storestore =stores.get(entry.getKey());

    生成StoreScanner实例。通过HStore.getScanner(scan,columns);

    KeyValueScannerscanner =store.getScanner(scan,entry.getValue());

    if(this.filter== null|| !scan.doLoadColumnFamiliesOnDemand()

    ||this.filter.isFamilyEssential(entry.getKey())){

    scanners.add(scanner);

    } else{

    joinedScanners.add(scanner);

    }

    }

    生成KeyValueHeap实例。把全部的storescanner的開始位置移动到startkey的位置并得到topStoreScanner,

    this.storeHeap= newKeyValueHeap(scanners,comparator);

    if(!joinedScanners.isEmpty()){

    this.joinedHeap= newKeyValueHeap(joinedScanners,comparator);

    }

    }


    得到StoreScanner实例的HStore.getScanner(scan,columns)方法

    publicKeyValueScannergetScanner(Scanscan,

    finalNavigableSet<byte[]> targetCols)throwsIOException {

    lock.readLock().lock();

    try{

    KeyValueScannerscanner =null;

    if(this.getCoprocessorHost()!= null){

    scanner= this.getCoprocessorHost().preStoreScannerOpen(this,scan,targetCols);

    }

    if(scanner ==null){

    scanner= newStoreScanner(this,getScanInfo(),scan,targetCols);

    }

    returnscanner;

    }finally{

    lock.readLock().unlock();

    }

    }

    生成StoreScanner的构造方法:

    publicStoreScanner(Storestore,ScanInfo scanInfo,Scan scan,finalNavigableSet<byte[]>columns)

    throwsIOException {

    this(store,scan.getCacheBlocks(),scan,columns,scanInfo.getTtl(),

    scanInfo.getMinVersions());

    假设设置有scan_raw_属性时。columns的值须要为null

    if(columns !=null&& scan.isRaw()){

    thrownewDoNotRetryIOException(

    "Cannotspecify any column for a raw scan");

    }

    matcher= newScanQueryMatcher(scan,scanInfo,columns,

    ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

    oldestUnexpiredTS);

    得到StoreFileScanner,StoreFileScanner中引用的StoreFile.Reader中引用HFileReaderV2,

    HFileReaderV2的实例在StoreFile.Reader中假设已经存在。不会又一次创建,这样会加快scanner的创建时间。

    //Pass columns to try to filter out unnecessary StoreFiles.

    List<KeyValueScanner>scanners =getScannersNoCompaction();


    //Seek all scanners to the start of the Row (or if the exact matchingrow

    //key does not exist, then to the start of the next matching Row).

    //Always check bloom filter to optimize the top row seek for delete

    //family marker.

    if(explicitColumnQuery&& lazySeekEnabledGlobally){

    for(KeyValueScannerscanner :scanners) {

    scanner.requestSeek(matcher.getStartKey(),false,true);

    }

    }else{

    if(!isParallelSeekEnabled){

    for(KeyValueScannerscanner :scanners) {

    scanner.seek(matcher.getStartKey());

    }

    } else{

    parallelSeek(scanners,matcher.getStartKey());

    }

    }


    //set storeLimit

    this.storeLimit= scan.getMaxResultsPerColumnFamily();


    //set rowOffset

    this.storeOffset= scan.getRowOffsetPerColumnFamily();


    //Combine all seekedscanners with a heap

    heap= newKeyValueHeap(scanners,store.getComparator());

    注冊,假设有storefile更新时。把更新后的storefile加入到这个StoreScanner中来。

    this.store.addChangedReaderObserver(this);

    }


    发起scanrpc操作

    client端发起openScanner操作后,得到一个scannerId.此时发起scan操作。

    通过ScannerCallable.call中发起call的操作,在scannerId不等于-1时。


    Result [] rrs= null;

    ScanRequest request= null;

    try{

    incRPCcallsMetrics();

    request= RequestConverter.buildScanRequest(scannerId,caching,false,nextCallSeq);

    ScanResponse response= null;

    PayloadCarryingRpcControllercontroller= newPayloadCarryingRpcController();

    try{

    controller.setPriority(getTableName());

    response= getStub().scan(controller,request);

    ...................................此处省去一些代码

    nextCallSeq++;

    longtimestamp =System.currentTimeMillis();

    //Results are returned via controller

    CellScannercellScanner= controller.cellScanner();

    rrs= ResponseConverter.getResults(cellScanner,response);



    HregionServer.scan方法中对scan时的处理流程:

    得到scan中的caching属性的值,此值主要用来响应client返回的条数。

    假设一行数据包括多个kv,算一条

    introws = 1;

    if(request.hasNumberOfRows()){

    rows= request.getNumberOfRows();

    }

    假设client传入的scannerId有值,也就是不等于-1时,表示不是openScanner操作,检查scannerid是否过期

    if(request.hasScannerId()){

    rsh= scanners.get(scannerName);

    if(rsh ==null){

    LOG.info("Clienttried to access missing scanner " +scannerName);

    thrownewUnknownScannerException(

    "Name:" + scannerName+ ", already closed?");

    }

    此处主要是检查region是否发生过split操作。假设是会出现NotServingRegionException操作。

    scanner= rsh.s;

    HRegionInfo hri= scanner.getRegionInfo();

    region= getRegion(hri.getRegionName());

    if(region !=rsh.r){ // Yes, should be the same instance

    thrownewNotServingRegionException("Regionwas re-opened after the scanner"

    + scannerName+ " was created: "+ hri.getRegionNameAsString());

    }

    } else{

    ...................................此处省去一些生成Regionscanner的代码

    }

    表示有设置caching,假设是运行scan,此时的默认值为1,当前scan中设置有caching后,使用scan中设置的值

    if(rows >0) {

    //if nextCallSeq does not match throw Exception straight away. Thisneeds to be

    //performed even before checking of Lease.

    //See HBASE-5974

    是否有配置nextCallSeq的值。第一次调用时,此值为0,每调用一次加一,client也一样,每调用一次加一。

    if(request.hasNextCallSeq()){

    if(rsh ==null){

    rsh= scanners.get(scannerName);

    }

    if(rsh !=null){

    if(request.getNextCallSeq()!= rsh.nextCallSeq){

    thrownewOutOfOrderScannerNextException("ExpectednextCallSeq: " + rsh.nextCallSeq

    + "But the nextCallSeq got from client: "+ request.getNextCallSeq()+

    ";request=" +TextFormat.shortDebugString(request));

    }

    //Increment the nextCallSeq value which is the next expected fromclient.

    rsh.nextCallSeq++;

    }

    }

    try{

    先从租约管理中移出此租约,防止查找时间大于过期时间而出现的超时

    //Remove lease while its being processed in server; protects againstcase

    //where processing of request takes > lease expiration time.

    lease= leases.removeLease(scannerName);

    生成要返回的条数的一个列表。scan.caching

    List<Result>results =newArrayList<Result>(rows);

    longcurrentScanResultSize= 0;


    booleandone =false;

    调用cppreScannernext,假设返回为true,表示不在运行scan操作。

    //Call coprocessor. Get region info from scanner.

    if(region !=null&& region.getCoprocessorHost()!= null){

    Boolean bypass= region.getCoprocessorHost().preScannerNext(

    scanner,results,rows);

    if(!results.isEmpty()){

    for(Result r :results) {

    if(maxScannerResultSize< Long.MAX_VALUE){

    for(Cellkv :r.rawCells()){

    //TODO

    currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

    }

    }

    }

    }

    if(bypass !=null&& bypass.booleanValue()){

    done= true;

    }

    }

    运行scan操作。

    CppreScannerNext返回为false,或没有设置cp(主要是RegionObServer)

    返回给client的最大size通过hbase.client.scanner.max.result.size配置。默觉得long.maxvalue

    假设scan也设置有maxResultSize,使用scan设置的值

    if(!done) {

    longmaxResultSize= scanner.getMaxResultSize();

    if(maxResultSize<= 0) {

    maxResultSize= maxScannerResultSize;

    }

    List<Cell>values =newArrayList<Cell>();

    MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());

    region.startRegionOperation(Operation.SCAN);

    try{

    inti = 0;

    synchronized(scanner){

    此处開始迭代,開始调用regionScanner(HRegion.RegionScannerImpl.nextRaw(List))进行查找,

    迭代的长度为scan设置的caching的大小,假设运行RegionScanner.nextRaw(List)返回为false,时也会停止迭代

    for(; i <rows

    &&currentScanResultSize< maxResultSize;i++) {

    返回的true表示还有数据。能够接着查询,否则表示此region中已经没有符合条件的数据了。

    //Collect values to be returned here

    booleanmoreRows =scanner.nextRaw(values);

    if(!values.isEmpty()){

    if(maxScannerResultSize< Long.MAX_VALUE){

    for(Cellkv :values) {

    currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

    }

    }

    results.add(Result.create(values));

    }

    if(!moreRows){

    break;

    }

    values.clear();

    }

    }

    region.readRequestsCount.add(i);

    } finally{

    region.closeRegionOperation();

    }


    //coprocessor postNext hook

    if(region !=null&& region.getCoprocessorHost()!= null){

    region.getCoprocessorHost().postScannerNext(scanner,results,rows,true);

    }

    }

    假设没有能够再查找的数据时。设置responsemoreResultsfalse

    //If the scanner's filter - if any - is done with the scan

    //and wants to tell the client to stop the scan. This is done bypassing

    //a null result, and setting moreResults to false.

    if(scanner.isFilterDone()&& results.isEmpty()){

    moreResults= false;

    results= null;

    } else{

    加入结果到response中,假设hbase.client.rpc.codec配置有codec的值,

    默认取hbase.client.default.rpc.codec配置的值。默觉得KeyValueCodec

    假设上面说的codec配置不为null时,把results生成为一个iterator,并生成一个匿名的CallScanner实现类

    设置到scan时传入的controller中。这样能提升查询数据的读取性能。

    假设没有配置codec时。默认直接把results列表设置到response中,这样响应的数据可能会比較大。

    addResults(builder,results,controller);

    }

    } finally{

    又一次把租约放入到租约检查管理器中,此租约主要来检查client多长时间没有发起过scan的操作。

    //We're done. On way out re-add the above removed lease.

    //Adding resets expiration time on lease.

    if(scanners.containsKey(scannerName)){

    if(lease !=null)leases.addLease(lease);

    ttl= this.scannerLeaseTimeoutPeriod;

    }

    }

    }


    client端获取响应的数据:ScannerCallable.call方法中

    rrs= ResponseConverter.getResults(cellScanner,response);


    ResponseConverter.getResults方法的实现

    publicstaticResult[] getResults(CellScannercellScanner,ScanResponse response)

    throwsIOException {

    if(response== null)returnnull;

    //If cellscanner,then the number of Results to return is the count of elements in the

    //cellsPerResult list. Otherwise, it is how many results are embeddedinside the response.

    intnoOfResults= cellScanner!= null?

    response.getCellsPerResultCount():response.getResultsCount();

    Result[] results= newResult[noOfResults];

    for(inti = 0; i< noOfResults;i++) {

    cellScanner假设codec配置为有值时,在rs响应时会生成一个匿名的实现

    if(cellScanner!= null){

    ......................................

    intnoOfCells =response.getCellsPerResult(i);

    List<Cell>cells = newArrayList<Cell>(noOfCells);

    for(intj = 0; j< noOfCells;j++) {

    try{

    if(cellScanner.advance()== false){

    .....................................

    String msg= "Results sent from server="+ noOfResults+ ". But only got "+ i

    + "results completely at client. Resetting the scanner to scan again.";

    LOG.error(msg);

    thrownewDoNotRetryIOException(msg);

    }

    } catch(IOException ioe){

    ...........................................

    LOG.error("Exceptionwhile reading cells from result."

    + "Resettingthe scanner to scan again.", ioe);

    thrownewDoNotRetryIOException("Resettingthe scanner.", ioe);

    }

    cells.add(cellScanner.current());

    }

    results[i]= Result.create(cells);

    } else{

    否则,没有设置codec。直接从response中读取出来数据,

    //Result is pure pb.

    results[i]= ProtobufUtil.toResult(response.getResults(i));

    }

    }

    returnresults;

    }


    ClientScanner.next方法中,假设还没有达到scancaching的值,(默觉得1)也就是countdown的值还不等于0

    ,countdown的值为得到一个Result时减1,通过nextScanner又一次得到下一个region,并发起连接去scan数据。


    Do{

    .........................此处省去一些代码。

    if(values !=null&& values.length> 0) {

    for(Result rs: values) {

    cache.add(rs);

    for(Cellkv :rs.rawCells()){

    //TODOmake method in Cell or CellUtil

    remainingResultSize-= KeyValueUtil.ensureKeyValue(kv).heapSize();

    }

    countdown--;

    this.lastResult= rs;

    }

    }

    }while(remainingResultSize> 0 && countdown> 0 && nextScanner(countdown,values ==null));


    对于这样的类型的查询操作。能够使用得到一个ClientScanner后,不运行close操作。

    rstimeout前每次定期去从rs中拿一定量的数据下来。

    缓存到ClientScannercache中。

    每次next时从cache中直接拿数据


    Hregion.RegionScannerImpl.nextRaw(list)方法分析

    RegionScannerImpl是对RegionScanner接口的实现。

    Rsscan在运行时通过regionScanner.nextRaw(list)来获取数据。

    通过regionScanner.isFilterDone来检查此region的查找是否完毕。


    调用nextRaw方法,此方法调用还有一个重载方法,batchscan中设置的每次可查询最大的单行中的多少个kvkv个数

    publicbooleannextRaw(List<Cell>outResults)

    throwsIOException {

    returnnextRaw(outResults,batch);

    }


    publicbooleannextRaw(List<Cell>outResults,intlimit)throwsIOException {

    booleanreturnResult;

    调用nextInternal方法。

    if(outResults.isEmpty()){

    //Usually outResults is empty. This is true when next is called

    //to handle scan or get operation.

    returnResult= nextInternal(outResults,limit);

    } else{

    List<Cell>tmpList =newArrayList<Cell>();

    returnResult= nextInternal(tmpList,limit);

    outResults.addAll(tmpList);

    }

    调用filter.reset方法,清空当前rowfilter的相关信息。

    ResetFilters();

    假设filter.filterAllRemaining()的返回值为true,时表示当前region的查找条件已经结束,不能在运行查找操作。

    没有能够接着查找的须要,也就是没有很多其它要查找的行了。

    if(isFilterDone()){

    returnfalse;

    }

    ................................此处省去一些代码

    returnreturnResult;

    }


    nextInternal方法处理流程:

    privatebooleannextInternal(List<Cell>results,intlimit)

    throwsIOException {

    if(!results.isEmpty()){

    thrownewIllegalArgumentException("Firstparameter should be an empty list");

    }

    RpcCallContextrpcCall =RpcServer.getCurrentCall();

    //The loop here is used only when at some point during the next wedetermine

    //that due to effects of filters or otherwise, we have an empty row inthe result.

    //Then we loop and try again. Otherwise, we must get out on the firstiteration via return,

    //"true" if there's more data to read, "false" ifthere isn't (storeHeap is at a stop row,

    //and joinedHeap has no more data to read for the last row (if set,joinedContinuationRow).

    while(true){

    if(rpcCall !=null){

    //If a user specifies a too-restrictive or too-slow scanner, the

    //client might time out and disconnect while the server side

    //is still processing the request. We should abort aggressively

    //in that case.

    longafterTime =rpcCall.disconnectSince();

    if(afterTime>= 0) {

    thrownewCallerDisconnectedException(

    "Abortingon region " +getRegionNameAsString()+ ", call "+

    this+ " after "+ afterTime+ " ms, since "+

    "callerdisconnected");

    }

    }

    得到通过startkeyseek后当前最小的一个kv

    //Let's see what we have in the storeHeap.

    KeyValue current= this.storeHeap.peek();


    byte[]currentRow= null;

    intoffset = 0;

    shortlength = 0;

    if(current !=null){

    currentRow= current.getBuffer();

    offset= current.getRowOffset();

    length= current.getRowLength();

    }

    检查是否到了stopkey,假设是。返回false,joinedContinuationRow是多个cf的关联查找。不用去管它

    booleanstopRow =isStopRow(currentRow,offset,length);

    //Check if we were getting data from the joinedHeap and hit the limit.

    //If not, then it's main path - getting results from storeHeap.

    if(joinedContinuationRow== null){

    //First, check if we are at a stop row. If so, there are no moreresults.

    if(stopRow) {

    假设是stopRow,同一时候filter.hasFilterRow返回为true时。

    可通过filterRowCells来检查要返回的kvlist,也能够用来改动要返回的kvlist

    if(filter !=null&& filter.hasFilterRow()){

    filter.filterRowCells(results);

    }

    returnfalse;

    }

    通过filter.filterRowkey来过滤检查key是否须要排除,假设是排除返回true,否则返回false

    //Check if rowkey filter wants to exclude this row. If so, loop tonext.

    //Technically, if we hit limits before on this row, we don't need thiscall.

    if(filterRowKey(currentRow,offset,length)) {

    假设rowkey是须要排除的rowkey,检查是否有下一行数据。

    假设没有下一行数据。返回flase,表示当前region查找结束

    否则清空当前的results,又一次进行查找

    booleanmoreRows =nextRow(currentRow,offset,length);

    if(!moreRows)returnfalse;

    results.clear();

    continue;

    }

    開始运行region下此scan须要的全部storeStoreScannernext进行查找,把查找的结果放到results列表中。

    假设一行中包括有多个kv,如今查找这些kv达到传入的limit的大小的时候,返回kv_limit的一个空的kv

    (查找的大小已经达到limit(batch)的一行最大scankv个数,返回kv_limit),

    否则表示还没有查找到limitkv个数,可是当前row相应的全部达到条件的kv都已经查找完毕,返回最后一个kv

    返回的kv假设不是kv_limit,那么有可能是null或者是下一行的第一个kv.

    KeyValue nextKv= populateResult(results,this.storeHeap,limit,currentRow,offset,

    length);

    假设达到limit的限制时,filter.hasFilterRow的值一定得是false,

    否则会throw IncompatibleFilterException

    假设达到limit的限制时。返回true,当前row的全部kv查找结束,返回true能够接着向下查找

    提示:假设hbase一行数据中可能包括多个kv时,最好是在scan时设置batch的属性,否则会一直查找到全部的kv结束

    //Ok, we are good, let's try to get some results from the main heap.

    if(nextKv ==KV_LIMIT) {

    if(this.filter!= null&& filter.hasFilterRow()){

    thrownewIncompatibleFilterException(

    "Filterwhose hasFilterRow() returns true is incompatible with scan withlimit!");

    }

    returntrue;// We hit the limit.

    }

    是否到结束行,从这一行代码中能够看出,stoprow是不包括的。由于nextKv肯定是下一行row中第一个kv的值

    stopRow= nextKv ==null||

    isStopRow(nextKv.getBuffer(),nextKv.getRowOffset(),nextKv.getRowLength());

    //save that the row was empty before filters applied to it.

    finalbooleanisEmptyRow= results.isEmpty();


    假设是stopRow,同一时候filter.hasFilterRow返回为true时,

    可通过filterRowCells来检查要返回的kvlist,也能够用来改动要返回的kvlist

    //We have the part of the row necessary for filtering (all of it,usually).

    //First filter with the filterRow(List).

    if(filter !=null&& filter.hasFilterRow()){

    filter.filterRowCells(results);

    }

    假设当前row的查找没有找到合法的kv,也就是results的列表没有值,检查是否还有下一行,

    假设有,又一次进行查找。否则表示当前region的查找最结尾处,不能再进行查找,返回fasle

    if(isEmptyRow){

    booleanmoreRows =nextRow(currentRow,offset,length);

    if(!moreRows)returnfalse;

    results.clear();

    //This row was totally filtered out, if this is NOT the last row,

    //we should continue on. Otherwise, nothing else to do.

    if(!stopRow)continue;

    returnfalse;

    }


    //Ok, we are done with storeHeap for this row.

    //Now we may need to fetch additional, non-essential data into row.

    //These values are not needed for filter to work, so we postpone their

    //fetch to (possibly) reduce amount of data loads from disk.

    if(this.joinedHeap!= null){

    ..................................进行关联查找的代码,不显示。也不分析

    }

    } else{

    多个store进行关联查询,不分析,通常情况不会有

    //Populating from the joined heap was stopped by limits, populate somemore.

    populateFromJoinedHeap(results,limit);

    }


    //We may have just called populateFromJoinedMap and hit the limits. Ifthat is

    //the case, we need to call it again on the next next() invocation.

    if(joinedContinuationRow!= null){

    returntrue;

    }

    假设这次的查找,results的结果为空。表示没有查找到结果。检查是否还有下一行数据,假设有又一次进行查找,

    否则返回false表示此region的查找结束

    //Finally, we are done with both joinedHeap and storeHeap.

    //Double check to prevent empty rows from appearing in result. It couldbe

    //the case when SingleColumnValueExcludeFilter is used.

    if(results.isEmpty()){

    booleanmoreRows =nextRow(currentRow,offset,length);

    if(!moreRows)returnfalse;

    if(!stopRow)continue;

    }

    stoprow时。表示还能够有下一行的数据,也就是能够接着进行next操作。否则表示此region的查找结束

    //We are done. Return the result.

    return!stopRow;

    }

    }


    UserScan时的ScanQueryMatcher.match方法处理

    userscan时的ScanQueryMatchernewRegionScannerImpl(scan,additionalScanners,this);时生成。

    在生成StoreScanner时通过例如以下代码生成matcher实例。


    matcher= newScanQueryMatcher(scan,scanInfo,columns,

    ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

    oldestUnexpiredTS);


    matcher.isUserScan的值此时为true.


    publicMatchCodematch(KeyValuekv) throwsIOException {

    检查当前region的查找是否结束。pageFilter就是通过控制此filter中的方法来检查是否须要

    if(filter !=null&& filter.filterAllRemaining()){

    returnMatchCode.DONE_SCAN;

    }


    byte[] bytes =kv.getBuffer();

    intoffset =kv.getOffset();


    intkeyLength =Bytes.toInt(bytes,offset,Bytes.SIZEOF_INT);

    offset+= KeyValue.ROW_OFFSET;


    intinitialOffset= offset;


    shortrowLength =Bytes.toShort(bytes,offset,Bytes.SIZEOF_SHORT);

    offset+= Bytes.SIZEOF_SHORT;

    检查传入的kv是否是当前行的kv。也就是rowkey是否同样,假设当前的rowkey小于传入的rowkey

    表示如今已经next到下一行了。返回DONE,表示当前行查找结束

    intret =this.rowComparator.compareRows(row,this.rowOffset,this.rowLength,

    bytes,offset,rowLength);

    if(ret <=-1) {

    returnMatchCode.DONE;

    }elseif(ret >=1) {

    假设当前的rowkey大于传入的rowkey。表示当前next出来的kv比方今的kv要小,运行nextrow操作。

    //could optimize this, if necessary?

    //Could also be called SEEK_TO_CURRENT_ROW, but this

    //should be rare/never happens.

    returnMatchCode.SEEK_NEXT_ROW;

    }

    是否跳过当前行的其他kv比較。这是一个优化项。

    //optimize case.

    if(this.stickyNextRow)

    returnMatchCode.SEEK_NEXT_ROW;

    假设当前行的全部要查找的(scan)column都查找完毕了。其他的当前行中非要scankv

    直接不比較,运行nextrow操作。

    if(this.columns.done()){

    stickyNextRow= true;

    returnMatchCode.SEEK_NEXT_ROW;

    }


    //PassingrowLength

    offset+= rowLength;


    //Skippingfamily

    bytefamilyLength= bytes[offset];

    offset+= familyLength+ 1;


    intqualLength= keyLength-

    (offset- initialOffset)- KeyValue.TIMESTAMP_TYPE_SIZE;

    检查当前KVTTL是否过期,假设过期,检查是否SCAN中还有下一个COLUMN,假设有返回SEEK_NEXT_COL

    否则返回SEEK_NEXT_ROW

    longtimestamp =Bytes.toLong(bytes,initialOffset+ keyLength- KeyValue.TIMESTAMP_TYPE_SIZE);

    //check for early out based on timestampalone

    if(columns.isDone(timestamp)){

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }


    /*

    *The delete logic is pretty complicated now.

    *This is corroborated by the following:

    *1. The store might be instructed to keep deleted rows around.

    *2. A scan can optionally see past a delete marker now.

    *3. If deleted rows are kept, we have to find out when we can

    * remove the delete markers.

    *4. Family delete markers are always first (regardless of their TS)

    *5. Delete markers should not be counted as version

    *6. Delete markers affect puts of the *same* TS

    *7. Delete marker need to be version counted together with puts

    * they affect

    */

    bytetype =bytes[initialOffset+ keyLength– 1];

    假设当前KV是删除的KV

    if(kv.isDelete()){

    此处会进入。把删除的KV加入到DeleteTracker中,默认是ScanDeleteTracker

    if(!keepDeletedCells){

    //first ignore delete markers if the scanner can do so, and the

    //range does not include the marker

    //

    //during flushes and compactionsalso ignore delete markers newer

    //than the readpointof any open scanner, this prevents deleted

    //rows that could still be seen by a scanner from being collected

    booleanincludeDeleteMarker= seePastDeleteMarkers?

    tr.withinTimeRange(timestamp):

    tr.withinOrAfterTimeRange(timestamp);

    if(includeDeleteMarker

    &&kv.getMvccVersion()<= maxReadPointToTrackVersions){

    this.deletes.add(bytes,offset,qualLength,timestamp,type);

    }

    //Can't early out now, because DelFam come before any other keys

    }

    此处的检查不会进入,userscan不保留删除的数据

    if(retainDeletesInOutput

    || (!isUserScan&& (EnvironmentEdgeManager.currentTimeMillis()- timestamp)<= timeToPurgeDeletes)

    || kv.getMvccVersion()> maxReadPointToTrackVersions){

    //always include or it is not time yet to check whether it is OK

    //to purge deltesor not

    if(!isUserScan){

    //if this is not a user scan (compaction), we can filter thisdeletemarkerright here

    //otherwise (i.e. a "raw" scan) we fall through to normalversion and timerangechecking

    returnMatchCode.INCLUDE;

    }

    } elseif(keepDeletedCells){

    if(timestamp< earliestPutTs){

    //keeping delete rows, but there are no puts older than

    //this delete in the store files.

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }

    //else: fall through and do version counting on the

    //delete markers

    } else{

    returnMatchCode.SKIP;

    }

    //note the following next else if...

    //delete marker are not subject to other delete markers

    }elseif(!this.deletes.isEmpty()){

    假设deleteTracker中不为空时,也就是当前行中有删除的KV,检查当前KV是否是删除的KV

    提示:删除的KVcompare时,比正常的KV要小,所以在运行next操作时,deleteKV会先被查找出来。

    假设是删除的KV,依据KV的删除类型。假设是版本号被删除,返回SKIP

    否则假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

    否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

    DeleteResultdeleteResult= deletes.isDeleted(bytes,offset,qualLength,

    timestamp);

    switch(deleteResult){

    caseFAMILY_DELETED:

    caseCOLUMN_DELETED:

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    caseVERSION_DELETED:

    caseFAMILY_VERSION_DELETED:

    returnMatchCode.SKIP;

    caseNOT_DELETED:

    break;

    default:

    thrownewRuntimeException("UNEXPECTED");

    }

    }

    检查KV的时间是否在SCAN要查找的时间范围内,

    inttimestampComparison= tr.compare(timestamp);

    假设大于SCAN的最大时间。返回SKIP

    if(timestampComparison>= 1) {

    returnMatchCode.SKIP;

    }elseif(timestampComparison<= -1) {

    假设小于SCAN的最小时间,假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

    否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    }

    检查当前KVcolumn是否是SCAN中指定的column列表中包括的值,假设是INCLUDE

    否则假设SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

    否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

    //STEP 1: Check if the column is part of the requested columns

    MatchCodecolChecker= columns.checkColumn(bytes,offset,qualLength,type);

    假设columnSCAN中要查找的column之中的一个

    if(colChecker== MatchCode.INCLUDE){

    ReturnCodefilterResponse= ReturnCode.SKIP;

    //STEP 2: Yes, the column is part of the requested columns. Check iffilter is present

    if(filter !=null){

    运行filter.filterKeyValue操作。并返回filter过滤的结果

    //STEP 3: Filter the key value and return if it filters out

    filterResponse= filter.filterKeyValue(kv);

    switch(filterResponse){

    caseSKIP:

    returnMatchCode.SKIP;

    caseNEXT_COL:

    假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

    否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

    returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

    caseNEXT_ROW:

    stickyNextRow= true;

    returnMatchCode.SEEK_NEXT_ROW;

    caseSEEK_NEXT_USING_HINT:

    returnMatchCode.SEEK_NEXT_USING_HINT;

    default:

    //Itmeans it is either include or include and seek next

    break;

    }

    }

    /*

    * STEP 4: Reaching this stepmeans the column is part of the requested columns and either

    * the filter is null or thefilter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.

    * Now check the number ofversions needed. This method call returns SKIP, INCLUDE,

    * INCLUDE_AND_SEEK_NEXT_ROW,INCLUDE_AND_SEEK_NEXT_COL.

    *

    * FilterResponse ColumnChecker Desired behavior

    * INCLUDE SKIP row has already been included, SKIP.

    * INCLUDE INCLUDE INCLUDE

    * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

    * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

    * INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

    *

    * In all the above scenarios, wereturn the column checker return value except for

    * FilterResponse(INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)

    */


    此处主要是检查KV的是否是SCAN的最大版本号内,到这个地方。除非是KV超过了要SCAN的最大版本号,或者KVTTL过期。

    否则肯定是会包括此KV的值。


    colChecker=

    columns.checkVersions(bytes,offset,qualLength,timestamp,type,

    kv.getMvccVersion()> maxReadPointToTrackVersions);

    //Optimizewith stickyNextRow

    stickyNextRow= colChecker== MatchCode.INCLUDE_AND_SEEK_NEXT_ROW? true: stickyNextRow;

    return(filterResponse== ReturnCode.INCLUDE_AND_NEXT_COL&&

    colChecker== MatchCode.INCLUDE)? MatchCode.INCLUDE_AND_SEEK_NEXT_COL

    : colChecker;

    }

    stickyNextRow= (colChecker== MatchCode.SEEK_NEXT_ROW)?

    true

    : stickyNextRow;

    returncolChecker;

    }


  • 相关阅读:
    從 IC流程中探索數位工程師的風格--III
    從 IC流程中探索數位工程師的風格--II
    從 IC流程中探索數位工程師的風格--I
    producer and consumer concept ( II )
    producer and consumer concept ( I )
    是否long pulse 訊號一定要拿來做同步處理?不做同步處理可以嗎?
    module介面訊號的收斂與發散的思考
    恐龍版OS裏的哲學家問題的思考
    git创建与合并分支
    把CentOS启动进度条替换为详细信息
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/6779956.html
Copyright © 2011-2022 走看看