zoukankan      html  css  js  c++  java
  • Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据

    读取命名空间镜像和编辑日志数据

    1.读取命名空间镜像

    类FSImage是 命名空间镜像的java实现,在源码中,英文注释为,

    /**
     * FSImage handles checkpointing and logging of the namespace edits.
     *
     */

    FSImage.loadFSImage(FSNamesystem, StartupOption, MetaRecoveryContext) 读取命名空间镜像。

      1     private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
      2                                 MetaRecoveryContext recovery)
      3             throws IOException {
      4         final boolean rollingRollback
      5                 = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
      6         final EnumSet<NameNodeFile> nnfs;
      7         if (rollingRollback) {
      8             // if it is rollback of rolling upgrade, only load from the rollback image
      9             nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
     10         } else {
     11             // otherwise we can load from both IMAGE and IMAGE_ROLLBACK
     12             nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
     13         }
     14         final FSImageStorageInspector inspector = storage
     15                 .readAndInspectDirs(nnfs, startOpt);
     16 
     17         isUpgradeFinalized = inspector.isUpgradeFinalized();
     18         List<FSImageFile> imageFiles = inspector.getLatestImages();
     19 
     20         StartupProgress prog = NameNode.getStartupProgress();
     21         prog.beginPhase(Phase.LOADING_FSIMAGE);
     22         File phaseFile = imageFiles.get(0).getFile();
     23         prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
     24         prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
     25         boolean needToSave = inspector.needToSave();
     26 
     27         Iterable<EditLogInputStream> editStreams = null;
     28 
     29         initEditLog(startOpt);
     30 
     31         if (NameNodeLayoutVersion.supports(
     32                 LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
     33             // If we're open for write, we're either non-HA or we're the active NN, so
     34             // we better be able to load all the edits. If we're the standby NN, it's
     35             // OK to not be able to read all of edits right now.
     36             // In the meanwhile, for HA upgrade, we will still write editlog thus need
     37             // this toAtLeastTxId to be set to the max-seen txid
     38             // For rollback in rolling upgrade, we need to set the toAtLeastTxId to
     39             // the txid right before the upgrade marker.
     40             long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
     41                     .getMaxSeenTxId() : 0;
     42             if (rollingRollback) {
     43                 // note that the first image in imageFiles is the special checkpoint
     44                 // for the rolling upgrade
     45                 toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
     46             }
     47             editStreams = editLog.selectInputStreams(
     48                     imageFiles.get(0).getCheckpointTxId() + 1,
     49                     toAtLeastTxId, recovery, false);
     50         } else {
     51             editStreams = FSImagePreTransactionalStorageInspector
     52                     .getEditLogStreams(storage);
     53         }
     54         int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
     55                 DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
     56         for (EditLogInputStream elis : editStreams) {
     57             elis.setMaxOpSize(maxOpSize);
     58         }
     59 
     60         for (EditLogInputStream l : editStreams) {
     61             LOG.debug("Planning to load edit log stream: " + l);
     62         }
     63         if (!editStreams.iterator().hasNext()) {
     64             LOG.info("No edit log streams selected.");
     65         }
     66 
     67         FSImageFile imageFile = null;
     68         for (int i = 0; i < imageFiles.size(); i++) {
     69             try {
     70                 imageFile = imageFiles.get(i);
     71                 loadFSImageFile(target, recovery, imageFile, startOpt);
     72                 break;
     73             } catch (IOException ioe) {
     74                 LOG.error("Failed to load image from " + imageFile, ioe);
     75                 target.clear();
     76                 imageFile = null;
     77             }
     78         }
     79         // Failed to load any images, error out
     80         if (imageFile == null) {
     81             FSEditLog.closeAllStreams(editStreams);
     82             throw new IOException("Failed to load an FSImage file!");
     83         }
     84         prog.endPhase(Phase.LOADING_FSIMAGE);
     85 
     86         if (!rollingRollback) {
     87             long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
     88             needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
     89                     txnsAdvanced);
     90             if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
     91                 // rename rollback image if it is downgrade
     92                 renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
     93             }
     94         } else {
     95             // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
     96             // to the last txid in rollback fsimage.
     97             rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
     98             needToSave = false;
     99         }
    100         editLog.setNextTxId(lastAppliedTxId + 1);
    101         return needToSave;
    102     }

    上面的代码中,for循环语句包含的代码用于读入文件的信息。

    2.读取编辑日志

    读取命名空间镜像后,内存中的名字节点只包含了保存镜像的那一个时刻的内容,还需要读取编辑日志中的内容才能恢复数据。

    目前 hdfs 通过 FSEditLogLoader类 读取编辑日志。读取的代码如下。

      1 long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
      2       StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
      3     StartupProgress prog = NameNode.getStartupProgress();
      4     Step step = createStartupProgressStep(edits);
      5     prog.beginStep(Phase.LOADING_EDITS, step);
      6     fsNamesys.writeLock();
      7     try {
      8       long startTime = now();
      9       FSImage.LOG.info("Start loading edits file " + edits.getName());
     10       long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
     11           startOpt, recovery);
     12       FSImage.LOG.info("Edits file " + edits.getName() 
     13           + " of size " + edits.length() + " edits # " + numEdits 
     14           + " loaded in " + (now()-startTime)/1000 + " seconds");
     15       return numEdits;
     16     } finally {
     17       edits.close();
     18       fsNamesys.writeUnlock();
     19       prog.endStep(Phase.LOADING_EDITS, step);
     20     }
     21   }
     22 
     23   long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
     24       long expectedStartingTxId, StartupOption startOpt,
     25       MetaRecoveryContext recovery) throws IOException {
     26     FSDirectory fsDir = fsNamesys.dir;
     27 
     28     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
     29       new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
     30 
     31     if (LOG.isTraceEnabled()) {
     32       LOG.trace("Acquiring write lock to replay edit log");
     33     }
     34 
     35     fsNamesys.writeLock();
     36     fsDir.writeLock();
     37 
     38     long recentOpcodeOffsets[] = new long[4];
     39     Arrays.fill(recentOpcodeOffsets, -1);
     40     
     41     long expectedTxId = expectedStartingTxId;
     42     long numEdits = 0;
     43     long lastTxId = in.getLastTxId();
     44     long numTxns = (lastTxId - expectedStartingTxId) + 1;
     45     StartupProgress prog = NameNode.getStartupProgress();
     46     Step step = createStartupProgressStep(in);
     47     prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
     48     Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
     49     long lastLogTime = now();
     50     long lastInodeId = fsNamesys.getLastInodeId();
     51     
     52     try {
     53       while (true) {
     54         try {
     55           FSEditLogOp op;
     56           try {
     57             op = in.readOp();
     58             if (op == null) {
     59               break;
     60             }
     61           } catch (Throwable e) {
     62             // Handle a problem with our input
     63             check203UpgradeFailure(in.getVersion(true), e);
     64             String errorMessage =
     65               formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
     66             FSImage.LOG.error(errorMessage, e);
     67             if (recovery == null) {
     68                // We will only try to skip over problematic opcodes when in
     69                // recovery mode.
     70               throw new EditLogInputException(errorMessage, e, numEdits);
     71             }
     72             MetaRecoveryContext.editLogLoaderPrompt(
     73                 "We failed to read txId " + expectedTxId,
     74                 recovery, "skipping the bad section in the log");
     75             in.resync();
     76             continue;
     77           }
     78           recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
     79             in.getPosition();
     80           if (op.hasTransactionId()) {
     81             if (op.getTransactionId() > expectedTxId) { 
     82               MetaRecoveryContext.editLogLoaderPrompt("There appears " +
     83                   "to be a gap in the edit log.  We expected txid " +
     84                   expectedTxId + ", but got txid " +
     85                   op.getTransactionId() + ".", recovery, "ignoring missing " +
     86                   " transaction IDs");
     87             } else if (op.getTransactionId() < expectedTxId) { 
     88               MetaRecoveryContext.editLogLoaderPrompt("There appears " +
     89                   "to be an out-of-order edit in the edit log.  We " +
     90                   "expected txid " + expectedTxId + ", but got txid " +
     91                   op.getTransactionId() + ".", recovery,
     92                   "skipping the out-of-order edit");
     93               continue;
     94             }
     95           }
     96           try {
     97             if (LOG.isTraceEnabled()) {
     98               LOG.trace("op=" + op + ", startOpt=" + startOpt
     99                   + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);
    100             }
    101             long inodeId = applyEditLogOp(op, fsDir, startOpt,
    102                 in.getVersion(true), lastInodeId);
    103             if (lastInodeId < inodeId) {
    104               lastInodeId = inodeId;
    105             }
    106           } catch (RollingUpgradeOp.RollbackException e) {
    107             throw e;
    108           } catch (Throwable e) {
    109             LOG.error("Encountered exception on operation " + op, e);
    110             if (recovery == null) {
    111               throw e instanceof IOException? (IOException)e: new IOException(e);
    112             }
    113 
    114             MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
    115              "apply edit log operation " + op + ": error " +
    116              e.getMessage(), recovery, "applying edits");
    117           }
    118           // Now that the operation has been successfully decoded and
    119           // applied, update our bookkeeping.
    120           incrOpCount(op.opCode, opCounts, step, counter);
    121           if (op.hasTransactionId()) {
    122             lastAppliedTxId = op.getTransactionId();
    123             expectedTxId = lastAppliedTxId + 1;
    124           } else {
    125             expectedTxId = lastAppliedTxId = expectedStartingTxId;
    126           }
    127           // log progress
    128           if (op.hasTransactionId()) {
    129             long now = now();
    130             if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
    131               long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
    132               int percent = Math.round((float) deltaTxId / numTxns * 100);
    133               LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns
    134                   + " transactions completed. (" + percent + "%)");
    135               lastLogTime = now;
    136             }
    137           }
    138           numEdits++;
    139           totalEdits++;
    140         } catch (RollingUpgradeOp.RollbackException e) {
    141           LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
    142           break;
    143         } catch (MetaRecoveryContext.RequestStopException e) {
    144           MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
    145               in.getPosition() + "/"  + in.length());
    146           break;
    147         }
    148       }
    149     } finally {
    150       fsNamesys.resetLastInodeId(lastInodeId);
    151       if(closeOnExit) {
    152         in.close();
    153       }
    154       fsDir.writeUnlock();
    155       fsNamesys.writeUnlock();
    156 
    157       if (LOG.isTraceEnabled()) {
    158         LOG.trace("replaying edit log finished");
    159       }
    160 
    161       if (FSImage.LOG.isDebugEnabled()) {
    162         dumpOpCounts(opCounts);
    163       }
    164     }
    165     return numEdits;
    166   }
    167   

    在上述代码while循环中,每次读取EditLogInputStream流中的一个FSEditLogOp。

  • 相关阅读:
    python笔记第十一天 模块补充
    python笔记第十天 模块
    python笔记第九天 装饰器
    python笔记第八天 迭代器与生成器
    python笔记第七天 文件操作
    python笔记第六天 函数和函数的内置方法
    C语言----指针形参(指向指针的指针形参)
    NEON使用方法
    ARM NEON指令集总结
    三维变换矩阵左乘和右乘分析
  • 原文地址:https://www.cnblogs.com/birdhack/p/4297577.html
Copyright © 2011-2022 走看看