zoukankan      html  css  js  c++  java
  • hbase0.96 put流程 源码分析

    无意间多瞄了一眼hbase0.98的代码,想复习下put流程。发现htable里面已经找不到processBatchOfPuts()奇怪了。看了半天原来变化还真大事实上0.96就没这个了,于是又搞了个0.96的代码看看

    之前有篇能够对照差异,请转移至:http://blog.csdn.net/luyee2010/article/details/8435739 只是排版太乱将就看吧。
    HTable.java

      public void put(final Put put)
          throws InterruptedIOException, RetriesExhaustedWithDetailsException {
        doPut(put);
        if (autoFlush) {
          flushCommits();
        }
      }
    //批量
      @Override
      public void put(final List<Put> puts)
          throws InterruptedIOException, RetriesExhaustedWithDetailsException {
        for (Put put : puts) {
          doPut(put);
        }
        if (autoFlush) {
          flushCommits();
        }
      }
    

    这里writeAsyncBuffer已经替换了原来的 writeBuffer,事实上仅仅是名字不同

      private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
        if (ap.hasError()){
          backgroundFlushCommits(true);
        }
    
        validatePut(put);
    
        currentWriteBufferSize += put.heapSize();
        writeAsyncBuffer.add(put);
    
        while (currentWriteBufferSize > writeBufferSize) {
          backgroundFlushCommits(false);
        }
      }
    

    这里应该是backgroundFlushCommits与原来的flushCommits()差点儿相同,but跟踪进去,卧槽!,都是哪跟哪了,差异有点大。之前一行来着
    connection.processBatchOfPuts(writeBuffer, tableName, pool);
    1。假设当前writeAsyncBuffer不为空或者之前没运行无错误,提交writeAsyncBuffer

      private void backgroundFlushCommits(boolean synchronous) throws
          InterruptedIOException, RetriesExhaustedWithDetailsException {
    
        try {
          // If there is an error on the operations in progress, we don't add new operations.
          if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
            ap.submit(writeAsyncBuffer, true);
          }
    
          if (synchronous || ap.hasError()) {
            if (ap.hasError() && LOG.isDebugEnabled()) {
              LOG.debug(tableName + ": One or more of the operations have failed -" +
                  " waiting for all operation in progress to finish (successfully or not)");
            }
            ap.waitUntilDone();
          }
    
          if (ap.hasError()) {
            if (!clearBufferOnFail) {
              // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
              //  write buffer. This is a questionable feature kept here for backward compatibility
              writeAsyncBuffer.addAll(ap.getFailedOperations());
            }
            RetriesExhaustedWithDetailsException e = ap.getErrors();
            ap.clearErrors();
            throw e;
          }
        } finally {
          currentWriteBufferSize = 0;
          for (Row mut : writeAsyncBuffer) {
            if (mut instanceof Mutation) {
              currentWriteBufferSize += ((Mutation) mut).heapSize();
            }
          }
        }
      }
    

    这个backgroundFlushCommits看了好久也没看出个啥来。仅仅好跟ap.submit(writeAsyncBuffer, true);
    看到这个

    Map<HRegionLocation, MultiAction<Row>> actionsByServer =new HashMap<HRegionLocation, MultiAction<Row>>();
    

    的时候感觉一下子有希望了,这个应该跟之前的像吧!

      public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
        if (rows.isEmpty()) {
          return;
        }
    
        // This looks like we are keying by region but HRegionLocation has a comparator that compares
        // on the server portion only (hostname + port) so this Map collects regions by server.
        Map<HRegionLocation, MultiAction<Row>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<Row>>();
        List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
    
        do {
          // Wait until there is at least one slot for a new task.
          waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
    
          // Remember the previous decisions about regions or region servers we put in the
          //  final multi.
          Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
          Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
    
          int posInList = -1;
          Iterator<? extends Row> it = rows.iterator();
          while (it.hasNext()) {
            Row r = it.next();
            HRegionLocation loc = findDestLocation(r, 1, posInList);
    
            if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
              // loc is null if there is an error such as meta not available.
              Action<Row> action = new Action<Row>(r, ++posInList);
              retainedActions.add(action);
              addAction(loc, action, actionsByServer);
              it.remove();
            }
          }
    
        } while (retainedActions.isEmpty() && atLeastOne && !hasError());
    
        HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
        sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
      }
    

    定位row找到loc(HRegionLocation)

    HRegionLocation loc = findDestLocation(r, 1, posInList);
    

    按region聚合action:addAction(loc, action, actionsByServer);

    //Group the actions per region server
    private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
      MultiAction<Row>> actionsByServer) {
    final byte[] regionName = loc.getRegionInfo().getRegionName();
    MultiAction<Row> multiAction = actionsByServer.get(loc);
    if (multiAction == null) {
      multiAction = new MultiAction<Row>();
      actionsByServer.put(loc, multiAction);
    }
    
    multiAction.add(regionName, action);
    }
    

    然后是sendMultiAction()

    public void sendMultiAction(final List<Action<Row>> initialActions,
                                  Map<HRegionLocation, MultiAction<Row>> actionsByServer,
                                  final int numAttempt,
                                  final HConnectionManager.ServerErrorTracker errorsByServer) {
        // Send the queries and add them to the inProgress list
        // This iteration is by server (the HRegionLocation comparator is by server portion only).
        for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
          final HRegionLocation loc = e.getKey();
          final MultiAction<Row> multiAction = e.getValue();
          incTaskCounters(multiAction.getRegions(), loc.getServerName());
          Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
            @Override
            public void run() {
              MultiResponse res;
              try {
                MultiServerCallable<Row> callable = createCallable(loc, multiAction);
                try {
                  res = createCaller(callable).callWithoutRetries(callable);
                } catch (IOException e) {
                  LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
                    ", resubmitting all since not sure where we are at", e);
                  resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
                  return;
                }
    
                receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
              } finally {
                decTaskCounters(multiAction.getRegions(), loc.getServerName());
              }
            }
          });
    
          try {
            this.pool.submit(runnable);
          } catch (RejectedExecutionException ree) {
            // This should never happen. But as the pool is provided by the end user, let's secure
            //  this a little.
            decTaskCounters(multiAction.getRegions(), loc.getServerName());
            LOG.warn("The task was rejected by the pool. This is unexpected." +
                " Server is " + loc.getServerName(), ree);
            // We're likely to fail again, but this will increment the attempt counter, so it will
            //  finish.
            resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
          }
        }
      }
    

    这里有几个都不是非常懂的样子
    1。resubmitAll
    2,receiveMultiAction
    3,createCaller

    直到后面在callable里的call方法里看到了responseProto = getStub().multi(controller, requestProto);这不是HRegionServer.multi()
    先看callable吧,其它的慢慢再看。这个后面有 this.pool.submit(runnable)来提交运行的
    callable创建

      protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
          final MultiAction<Row> multi) {
        return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
      }
    

    call()方法

      public MultiResponse call() throws IOException {
        int countOfActions = this.multiAction.size();
        if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
        MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
        List<CellScannable> cells = null;
        // The multi object is a list of Actions by region.  Iterate by region.
        for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
          final byte [] regionName = e.getKey();
          final List<Action<R>> actions = e.getValue();
          RegionAction.Builder regionActionBuilder;
          if (this.cellBlock) {
            // Presize.  Presume at least a KV per Action.  There are likely more.
            if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
            // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
            // They have already been handled above. Guess at count of cells
            regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
          } else {
            regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
          }
          multiRequestBuilder.addRegionAction(regionActionBuilder.build());
        }
        // Controller optionally carries cell data over the proxy/service boundary and also
        // optionally ferries cell response data back out again.
        PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
        controller.setPriority(getTableName());
        ClientProtos.MultiResponse responseProto;
        ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
        try {
          responseProto = getStub().multi(controller, requestProto);
        } catch (ServiceException e) {
          return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
        }
        return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
      }
    

    这里主要看HRegionServer.multi()

      public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
      throws ServiceException {
        // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
        // It is also the conduit via which we pass back data.
        PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
        CellScanner cellScanner = controller != null? controller.cellScanner(): null;
        if (controller != null) controller.setCellScanner(null);
        List<CellScannable> cellsToReturn = null;
         MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
    
         for (RegionAction regionAction : request.getRegionActionList()) {
           this.requestCount.add(regionAction.getActionCount());
           RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
           HRegion region;
           try {
             region = getRegion(regionAction.getRegion());
           } catch (IOException e) {
             regionActionResultBuilder.setException(ResponseConverter.buildException(e));
             responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
             continue;  // For this region it's a failure.
           }
    
           if (regionAction.hasAtomic() && regionAction.getAtomic()) {
             // How does this call happen?

    It may need some work to play well w/ the surroundings. // Need to return an item per Action along w/ Action index. TODO. try { mutateRows(region, regionAction.getActionList(), cellScanner); } catch (IOException e) { // As it's atomic, we may expect it's a global failure. regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } } else { // doNonAtomicRegionMutation manages the exception internally cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); } // Load the controller with the Cells to return. if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } return responseBuilder.build(); }

    主要代码。其它都是build PB

     region = getRegion(regionAction.getRegion());
     mutateRows(region, regionAction.getActionList(), cellScanner);
      cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
                 regionActionResultBuilder, cellsToReturn);
    

    当中mutateRows()是PUT和DELETE相关的

      protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
          final CellScanner cellScanner)
      throws IOException {
        if (!region.getRegionInfo().isMetaTable()) {
          cacheFlusher.reclaimMemStoreMemory();
        }
        RowMutations rm = null;
        for (ClientProtos.Action action: actions) {
          if (action.hasGet()) {
            throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
              action.getGet());
          }
          MutationType type = action.getMutation().getMutateType();
          if (rm == null) {
            rm = new RowMutations(action.getMutation().getRow().toByteArray());
          }
          switch (type) {
          case PUT:
            rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
            break;
          case DELETE:
            rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
            break;
            default:
              throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
          }
        }
        region.mutateRow(rm);
      }
  • 相关阅读:
    Using Subversion and ViewCVS on Solaris 10
    Solaris开放源代码了!
    小笨霖英语笔记本(0)
    How to start CDE/JDS with xinit command
    英译汉练习:Solaris 10进入Linux领地
    UNIX/LINUX 平台可执行文件格式分析
    小笨霖英语笔记本(2)
    小笨霖英语笔记本(3)
    小笨霖英语笔记本(1)
    魔鬼城雅丹地貌
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5205434.html
Copyright © 2011-2022 走看看