zoukankan      html  css  js  c++  java
  • 11 hbase源码系列(十一)Put、Delete在服务端是如何处理

    hbase源码系列(十一)Put、Delete在服务端是如何处理? 

     

    在讲完之后HFile和HLog之后,今天我想分享是Put在Region Server经历些了什么?相信前面看了《HTable探秘》的朋友都会有印象,没看过的建议回去先看看,Put是通过MultiServerCallable来提交的多个Put,好,我们就先去这个类吧,在call方法里面,我们找到了这句。

    responseProto = getStub().multi(controller, requestProto);

    它调用了Region Server的multi方法。好,我们立即杀到HRegionServer去,搜索找到multi这个方法。

    复制代码
    public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
      throws ServiceException {
        // RpcController是属于后门的,这样返回的数据就不用序列化了
        PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
        CellScanner cellScanner = controller != null? controller.cellScanner(): null;
        if (controller != null) controller.setCellScanner(null);
        List<CellScannable> cellsToReturn = null;
         MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
         //取出来所有的Actionfor (RegionAction regionAction : request.getRegionActionList()) {
           this.requestCount.add(regionAction.getActionCount());
           RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
           HRegion region;
           try {
             //获取对应的HRegion
             region = getRegion(regionAction.getRegion());
           } catch (IOException e) {
             responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
             continue;  // 报告这个action有错       }
    
           if (regionAction.hasAtomic() && regionAction.getAtomic()) {
             try {
              //如果是原子操作,就走原子操作的通道           mutateRows(region, regionAction.getActionList(), cellScanner);
             } catch (IOException e) {
               regionActionResultBuilder.setException(ResponseConverter.buildException(e));
             }
           } else {
             // 非原子性提交,把错误内部处理了
             cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
                 regionActionResultBuilder, cellsToReturn);
           }
           responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
         }
         // 如果需要返回数据的话,就new一个createCellScanner扔回去if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
           controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
         }
         return responseBuilder.build();
       }
    复制代码

     这个方法里面还包括了PayloadCarryingRpcController和CellScanner可以看得出来它不只是被Put来用的,但是这些我们不管我们只看Put如何处理就行了。

    1、取出来所有的action(Put),这里主要是put,因为我们调用客户端就是这么调用的,其实别的类型也可以支持,获取他们对应的region。

    2、根据action的原子性来判断走哪个方法,原子性操作走mutateRows,非原子性操作走doNonAtomicRegionMutation方法,我查了一下这个Atomic到底是怎么回事,我搜索了一下代码,发现在调用HTable的mutateRow方法的时候,它设置了Atomic为true,这个是应该是支持一行数据的原子性的,有这个需求的童鞋可以尝试用这个方法,也是可以提交多个,包括Put、Delete操作。

    regionMutationBuilder.setAtomic(true);
    getStub().multi(null, request);

     我们先看doNonAtomicRegionMutation,这是我们常用的方式。

    复制代码
       List<ClientProtos.Action> mutations = null;
         for (ClientProtos.Action action: actions.getActionList()) {
           ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
           try {
             Result r = null;
             if (action.hasGet()) {
               Get get = ProtobufUtil.toGet(action.getGet());
               r = region.get(get);
             } else if (action.hasMutation()) {
               MutationType type = action.getMutation().getMutateType();
               if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
                   !mutations.isEmpty()) {
                 // 如果这个操作不是Put或者Delete的话,就一下子把前面的活都先干了?             doBatchOp(builder, region, mutations, cellScanner);
                 mutations.clear();
               }
               switch (type) {
               case APPEND:
                 r = append(region, action.getMutation(), cellScanner);
                 break;
               case INCREMENT:
                 r = increment(region, action.getMutation(), cellScanner);
                 break;
               case PUT:
               case DELETE:
                 // 前面的那些,我们都用得少,或者是不用,不用管它们,看这里就行if (mutations == null) {
                   mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
                }
                 mutations.add(action);
                 break;
               default:
                 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
              }
            } else {
               throw new HBaseIOException("Unexpected Action type");
            }
            if (r != null) {
              ClientProtos.Result pbResult = null;
              if (isClientCellBlockSupport()) {
                 pbResult = ProtobufUtil.toResultNoData(r);
                 //if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
                 cellsToReturn.add(r);
              } else {
                pbResult = ProtobufUtil.toResult(r);
              }
              //把result编译成Protobuf码,返回
              resultOrExceptionBuilder =
                ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
            }
          } catch (IOException ie) {
            resultOrExceptionBuilder = ResultOrException.newBuilder().
              setException(ResponseConverter.buildException(ie));
          }
          if (resultOrExceptionBuilder != null) {
            // Propagate index.        resultOrExceptionBuilder.setIndex(action.getIndex());
            builder.addResultOrException(resultOrExceptionBuilder.build());
          }
        }
        //进行批量操作 if (mutations != null && !mutations.isEmpty()) {
          doBatchOp(builder, region, mutations, cellScanner);
        }
        return cellsToReturn;
    复制代码

    这里面代码很多,也适配了很多种类型,是个大而全的方法,但是我们这里用到的只是把Put、Delete等的类型转换添加到mutations的列表里,然后走下面这个批量操作。

    此外get的批量操作也是走的这个方法,里面它走的是HRegion.get的方法返回一个Result。

    doBatchOp(builder, region, mutations, cellScanner);

     doBatchOp里面的代码我就补贴了,老帖代码就没意思了。

    1、还是得把Put、Delete给转换类型,这里的批量操作只支持全是Delete或者全是Put。

    2、用HRegion.batchMutate方法来执行操作,返回OperationStatus数组,记录每个action的状态,是成功,还是失败,或者是别的状态。

    在batchMutate里面首先就是检查是否是只读状态,然后检查是否是Meta Region的,是不执行MemStore检查了,因为MemStore的堆内存超过了阻塞队列的MemStore大小,就会报错误,太恶劣了。。。没catch的哦。

    long addedSize = doMiniBatchMutation(batchOp, isReplay);
    //MemStore的大小到了阀值,就要flush到文件了if (isFlushSize(newSize)) {
       requestFlush();
    }

    doMiniBatchMutation就是我们的终极boss了,是个很长很臭的类,贴代码都不能一下子全贴。

    1、实例化几个重要的类,后面具体会用到

    复制代码
    //日志,isInReplay是否支持重做,这里是false
    WALEdit walEdit = new WALEdit(isInReplay);
    //控制多版本的MemStore flush的结果,每次flush的w都是一样的,就好像同一批号的食品 MultiVersionConsistencyControl.WriteEntry w = null;
    long txid = 0; //日志同步是否成功boolean walSyncSuccessful = false; boolean locked = false;
    复制代码

    2、检查Put和Delete里面的列族是否和Region持有的列族的定义相同,有时候我们在Delete的时候是不填列族的,这里它给这个缺的列族来一个KeyValue.Type.DeleteFamily,删除列族的类型。

    3、给Row加锁,先计算hash值做key,如果该key没上过锁,就上一把锁,然后计算出来要写的action有多少个,记录到numReadyToWrite。

    4、更新时间戳,把该action里面的所有的kv的时间戳更新为最新的时间戳,它这里也会把之前的没运行的也一起更新。

    5、给该region加锁,这个时间点之后,就不允许读了,等待时间需要根据numReadyToWrite的数量来计算。

    复制代码
    //加锁,现在要上锁了,这段时间内不允许读
    lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; //等待时间final long waitTime = Math.min(maxBusyWaitDuration, busyWaitDuration * Math.min(numReadyToWrite, maxBusyWaitMultiplier)); if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { throw new RegionTooBusyException( "failed to get a lock in " + waitTime + "ms"); }
    复制代码

    6、上锁之后,下面就是重头戏了,也就是Put、Delete等的重点。给这些写入memstore的数据创建一个批次号。

    复制代码
    //为这次添加进MemStore的数据添加一个批次号
    w = mvcc.beginMemstoreInsert();
    
    //这是批次号的计算方式,nextWriteNumber就等于memstore的写的次数+1
    public WriteEntry beginMemstoreInsert() {
        synchronized (writeQueue) {
          long nextWriteNumber = ++memstoreWrite;
          WriteEntry e = new WriteEntry(nextWriteNumber);
          writeQueue.add(e);
          return e;
        }
    }
    复制代码

    7、把kv们写入到memstore当中,然后计算出来一个添加数据之后的新的MemStore的大小addedSize。

    复制代码
    //把kv们写入memstorelong addedSize = 0;
    for (int i = firstIndex; i < lastIndexExclusive; i++) {
        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
            continue;
        }
        addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
    } 
    复制代码

    这个添加到MemStore里面也没啥神秘的,因为MemStore里面有两个kv的集合,它只是把kv添加到集合里面去,看下面的代码就知道了。

    复制代码
    private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
        MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
        long size = 0;try {for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
            byte[] family = e.getKey();
            List<Cell> cells = e.getValue();
            //把kv添加到memstore当中
            Store store = getStore(family);
            for (Cell cell: cells) {
              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
              kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
              size += store.add(kv);
            }
          }
        } 
    return size; }
    复制代码

    注意这一句话kv.setMvccVersion(localizedWriteEntry.getWriteNumber());  后面会用到的。

    8、把kv添加到日志当中,标志状态为成功,如果是用户设置了不写入日志的,它就不写入日志了。

    复制代码
          Durability durability = Durability.USE_DEFAULT;
          for (int i = firstIndex; i < lastIndexExclusive; i++) {
            // 跳过状态不对的if (batchOp.retCodeDetails[i].getOperationStatusCode()
                != OperationStatusCode.NOT_RUN) {
              continue;
            }
            //标志状态为成功
            batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
    
            Mutation m = batchOp.operations[i];
            //获取自定义的日志同步方式
            Durability tmpDur = getEffectiveDurability(m.getDurability());
            if (tmpDur.ordinal() > durability.ordinal()) {
              durability = tmpDur;
            }
            if (tmpDur == Durability.SKIP_WAL) {
              //记录日志的kv的大小,但是不写入到日志当中          recordMutationWithoutWal(m.getFamilyCellMap());
              continue;
            }
            //把列族里面的kv全部添加到walEdit当中        addFamilyMapToWALEdit(familyMaps[i], walEdit);
          }
    复制代码

    9、先异步添加日志,这里为什么是异步的,因为之前给上锁了,暂时不能读了,如果这里调用的是同步的方法,后果自己想象下。

    Mutation mutation = batchOp.operations[firstIndex];
    if (walEdit.size() > 0) {
       //异步添加日志
       txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
               walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
    }

    10、释放之前创建的锁。

    //释放相关的锁if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
    }
    releaseRowLocks(acquiredRowLocks);

    11、同步日志。

    if (walEdit.size() > 0) {
        syncOrDefer(txid, durability);
    }
    walSyncSuccessful = true;

    12、结束该批次的操作。

    if (w != null) {
       mvcc.completeMemstoreInsert(w);
       w = null;
    }

    到这里其实就是结束了。但是如果添加到了MemStore里面了,但是日志没有同步成功呢?

    复制代码
    finally {
      if (!walSyncSuccessful) {
         //如果日志没有成功,     rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
      }
      ......  
    }
    复制代码

    一路跟踪代码下去,跟踪到代码在MemStore的rollback方法里面。

    复制代码
    KeyValue found = this.snapshot.get(kv);
    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
      this.snapshot.remove(kv);
    }
    // 比较一下mvcc,相同就删除.
    found = this.kvset.get(kv);
    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
       removeFromKVSet(kv);
       long s = heapSizeChange(kv, true);
       this.size.addAndGet(-s);
    }
    复制代码

    比较了MvccVersion,发现是同一批次的,就干掉了。

     

    过程写得比较凌乱了,把之前的总结一下吧:

    1、做准备工作,实例化变量

    2、检查Put和Delete里面的列族是否和Region持有的列族的定义相同。

    3、给Row加锁,先计算hash值做key,如果该key没上过锁,就上一把锁,然后计算出来要写的action有多少个,记录到numReadyToWrite。

    4、更新时间戳,把该action里面的所有的kv的时间戳更新为最新的时间戳,它这里也会把之前的没运行的也一起更新。

    5、给该region加锁,这个时间点之后,就不允许读了,等待时间需要根据numReadyToWrite的数量来计算。

    6、上锁之后,下面就是重头戏了,也就是Put、Delete等的重点。给这些写入memstore的数据创建一个批次号。

    7、把kv们写入到memstore当中,然后计算出来一个添加数据之后的新的MemStore的大小addedSize。

    8、把kv添加到日志当中,标志状态为成功,如果是用户设置了不写入日志的,它就不写入日志了。

    9、先异步添加日志。

    10、释放之前创建的锁。

    11、同步日志。

    12、结束该批次的操作。

    Final、同步日志没成功的,最后根据批次回滚MemStore中的操作。

     

    上面的过程适用于Put和Delete的批量操作,但是这里总感觉很好奇,就这样结束了,Put和Delete操作就没区别吗,那它怎么删除数据的?

    返回在第4步更新时间戳的时候,发现了一些猫腻,Delete的情况执行了prepareDeleteTimestamps方法,看看吧。

    复制代码
    void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
          throws IOException {
        for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
          byte[] family = e.getKey();
          List<Cell> cells = e.getValue();
          //列和count的映射
          Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
    
          for (Cell cell: cells) {
            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
            // 如果是时间戳是最新的话就执行下面这些操作
    if (kv.isLatestTimestamp() && kv.isDeleteType()) { //new一个Get从Store里面去搜索
    } else { kv.updateLatestStamp(byteNow); } } } }
    复制代码

    看来一下代码,这里是上来先判断是否是最新的时间戳,我就回去看来一下Delete的构造函数,尼玛。。。

    复制代码
    public Delete(byte [] row) {
        this(row, HConstants.LATEST_TIMESTAMP);
    }
    
    public Delete(byte [] row, long timestamp) {
        this(row, 0, row.length, timestamp);
    }
    复制代码

    只传了rowkey进去的,它就是最新的。。然后看了一下注释,凡是在这个时间点之前的所有版本的所有列,我们都要删除。

    好吧,我们很无奈的宣布,我们只能走kv.isLatestTimestamp() && kv.isDeleteType(),下面是没放出来的代码。

    复制代码
    byte[] qual = kv.getQualifier();
              if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
              //想到相同列的每次+1
              Integer count = kvCount.get(qual);
              if (count == null) {
                kvCount.put(qual, 1);
              } else {
                kvCount.put(qual, count + 1);
              }
              //更新之后把最新的count数量
              count = kvCount.get(qual);
    
              Get get = new Get(kv.getRow());
              get.setMaxVersions(count);
              get.addColumn(family, qual);
              //从store当中取出相应的result来
              List<Cell> result = get(get, false);
    
              if (result.size() < count) {
                // Nothing to delete 数量不够。。 更新最新的时间戳为现在的时间            kv.updateLatestStamp(byteNow);
                continue;
              }
              //数量超过了也不行if (result.size() > count) {
                throw new RuntimeException("Unexpected size: " + result.size());
              }
              //取最后一个的时间戳
              KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
              //更新kv的时间戳为getkv的时间戳          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
                  getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
    复制代码

     

    这里又干了一个Get操作,把列族的多个版本的内容取出来,如果数量不符合预期也会有问题,但是这后面操作的中心思想就是:

    (a)按照预期来说,取出来的少了,就设置删除的时间戳为现在;

    (b)取出来的多了,就报错;

    (c)刚好的,就把Delete的时间戳设置为最大的那个的时间戳,但即便是这样也没有删除数据。

    回到这里我又想起来,只有在Compaction之后,hbase的文件才会变小,难道是在那个时候删除的?那在删除之前,我们进行Get或者Scan操作的时候,会不会读到这些没有被删除的数据呢?

    好,让我们拭目以待。

     


     





    God has given me a gift. Only one. I am the most complete fighter in the world. My whole life, I have trained. I must prove I am worthy of someting. rocky_24
  • 相关阅读:
    有关多线程的一些技术问题
    Remoting VS WCF 传输效率对比
    中英文术语对照表
    WCF配置文件全攻略
    架构设计之分布式文件系统
    Rails性能优化简明指南 (转载)
    不要活在别人的生活里(摘自开复网)
    find 命令 使用 (转载)
    turbo C BGI 基本图形接口的 例子
    如何编写Ruby控制台程序(一)
  • 原文地址:https://www.cnblogs.com/rocky24/p/e6c2bcd8b0dd0d44dc76007a7b6e10d6.html
Copyright © 2011-2022 走看看