zoukankan      html  css  js  c++  java
  • HDFS源码分析EditLog之读取操作符

     在《HDFS源码分析EditLog之获取编辑日志输入流》一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream。在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在《HDFS源码分析之EditLogTailer》一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程:

            4、从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据
            5、调用文件系统镜像FSImage实例image的loadEdits(),利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,并获得编辑日志加载的大小editsLoaded;

            可见,我们在获得编辑日志输入流EditLogInputStream的集合streams后,就需要调用FSImage的loadEdits()方法,利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage。而HDFS是如何从编辑日志输入流中读取数据的呢?本文,我们将进行详细的探究!

            首先,在加载编辑日志的主要类FSEditLogLoader中,其核心方法loadEditRecords()中有如下一段代码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. while (true) {  
    2.   try {  
    3.     FSEditLogOp op;  
    4.     try {  
    5.         
    6.     // 从编辑日志输入流in中读取操作符op  
    7.       op = in.readOp();  
    8.         
    9.       // 如果操作符op为空,直接跳出循环,并返回  
    10.       if (op == null) {  
    11.         break;  
    12.       }  
    13.     } catch (Throwable e) {  
    14.       // ...省略部分代码  
    15.     }  
    16.       
    17.     // ...省略部分代码  
    18.   
    19.     try {  
    20.       // ...省略部分代码  
    21.       long inodeId = applyEditLogOp(op, fsDir, startOpt,  
    22.           in.getVersion(true), lastInodeId);  
    23.       if (lastInodeId < inodeId) {  
    24.         lastInodeId = inodeId;  
    25.       }  
    26.     } catch (RollingUpgradeOp.RollbackException e) {  
    27.       // ...省略部分代码  
    28.     } catch (Throwable e) {  
    29.       // ...省略部分代码  
    30.     }  
    31.     // ...省略部分代码  
    32.   } catch (RollingUpgradeOp.RollbackException e) {  
    33.     // ...省略部分代码  
    34.   } catch (MetaRecoveryContext.RequestStopException e) {  
    35.     // ...省略部分代码  
    36.   }  
    37. }  

            它会从编辑日志输入流in中读取一个操作符op,然后调用applyEditLogOp()方法,将操作符作用于内存元数据FSNamesystem。那么问题来了,这个操作符如何从数据流中被读取并解析的呢?

            接下来,我们就看下如何从编辑日志输出流EditLogInputStream中读取一个操作符,我们先看其readOp()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Read an operation from the stream 
    3.  * @return an operation from the stream or null if at end of stream 
    4.  * @throws IOException if there is an error reading from the stream 
    5.  */  
    6. public FSEditLogOp readOp() throws IOException {  
    7.   FSEditLogOp ret;  
    8.     
    9.   // 如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空  
    10.   if (cachedOp != null) {  
    11.     ret = cachedOp;  
    12.     cachedOp = null;  
    13.     return ret;  
    14.   }  
    15.     
    16.   // 如果缓存的cachedOp为null,调用nextOp()进行处理  
    17.   return nextOp();  
    18. }  

            很简单,如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空,如果缓存的cachedOp为null,则调用nextOp()进行处理。而EditLogInputStream中nextOp()是一个抽象方法,我们需要看其子类的实现方法,下面就以EditLogFileInputStream为例,看下其nextOp()方法:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override  
    2. protected FSEditLogOp nextOp() throws IOException {  
    3.   return nextOpImpl(false);  
    4. }  

            继续追踪nextOpImpl()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {  
    2.   FSEditLogOp op = null;  
    3.     
    4.   // 根据编辑日志文件输入流的状态判断:  
    5.   switch (state) {  
    6.   case UNINIT:// 如果为未初始化状态UNINIT  
    7.     try {  
    8.     // 调用init()方法进行初始化  
    9.       init(true);  
    10.     } catch (Throwable e) {  
    11.       LOG.error("caught exception initializing " + this, e);  
    12.       if (skipBrokenEdits) {  
    13.         return null;  
    14.       }  
    15.       Throwables.propagateIfPossible(e, IOException.class);  
    16.     }  
    17.       
    18.     // 检测编辑日志文件输入流状态,此时不应为UNINIT  
    19.     Preconditions.checkState(state != State.UNINIT);  
    20.       
    21.     // 再次调用nextOpImpl()方法  
    22.     return nextOpImpl(skipBrokenEdits);  
    23.   case OPEN:// 如果为打开OPEN状态  
    24.       
    25.     // 调用FSEditLogOp.Reader的readOp()方法,读取操作符  
    26.     op = reader.readOp(skipBrokenEdits);  
    27.     if ((op != null) && (op.hasTransactionId())) {  
    28.       long txId = op.getTransactionId();  
    29.       if ((txId >= lastTxId) &&  
    30.           (lastTxId != HdfsConstants.INVALID_TXID)) {  
    31.         //  
    32.         // Sometimes, the NameNode crashes while it's writing to the  
    33.         // edit log.  In that case, you can end up with an unfinalized edit log  
    34.         // which has some garbage at the end.  
    35.         // JournalManager#recoverUnfinalizedSegments will finalize these  
    36.         // unfinished edit logs, giving them a defined final transaction   
    37.         // ID.  Then they will be renamed, so that any subsequent  
    38.         // readers will have this information.  
    39.         //  
    40.         // Since there may be garbage at the end of these "cleaned up"  
    41.         // logs, we want to be sure to skip it here if we've read everything  
    42.         // we were supposed to read out of the stream.  
    43.         // So we force an EOF on all subsequent reads.  
    44.         //  
    45.         long skipAmt = log.length() - tracker.getPos();  
    46.         if (skipAmt > 0) {  
    47.           if (LOG.isDebugEnabled()) {  
    48.               LOG.debug("skipping " + skipAmt + " bytes at the end " +  
    49.                 "of edit log  '" + getName() + "': reached txid " + txId +  
    50.                 " out of " + lastTxId);  
    51.           }  
    52.           tracker.clearLimit();  
    53.           IOUtils.skipFully(tracker, skipAmt);  
    54.         }  
    55.       }  
    56.     }  
    57.     break;  
    58.     case CLOSED: // 如果为关闭CLOSED状态,直接返回null  
    59.       break; // return null  
    60.   }  
    61.   return op;  
    62. }  

            nextOpImpl()方法的大体处理逻辑如下:

            根据编辑日志文件输入流的状态判断:

            1、如果为未初始化状态UNINIT,调用init()方法进行初始化,然后检测编辑日志文件输入流状态,此时不应为UNINIT,最后再次调用nextOpImpl()方法;

            2、如果为打开OPEN状态,调用FSEditLogOp.Reader的readOp()方法,读取操作符op;

            3、如果为关闭CLOSED状态,直接返回null。

            我们重点关注下FSEditLogOp.Reader的readOp()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Read an operation from the input stream. 
    3.  *  
    4.  * Note that the objects returned from this method may be re-used by future 
    5.  * calls to the same method. 
    6.  *  
    7.  * @param skipBrokenEdits    If true, attempt to skip over damaged parts of 
    8.  * the input stream, rather than throwing an IOException 
    9.  * @return the operation read from the stream, or null at the end of the  
    10.  *         file 
    11.  * @throws IOException on error.  This function should only throw an 
    12.  *         exception when skipBrokenEdits is false. 
    13.  */  
    14. public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {  
    15.   while (true) {  
    16.     try {  
    17.           
    18.       // 调用decodeOp()方法  
    19.       return decodeOp();  
    20.     } catch (IOException e) {  
    21.       in.reset();  
    22.       if (!skipBrokenEdits) {  
    23.         throw e;  
    24.       }  
    25.     } catch (RuntimeException e) {  
    26.       // FSEditLogOp#decodeOp is not supposed to throw RuntimeException.  
    27.       // However, we handle it here for recovery mode, just to be more  
    28.       // robust.  
    29.       in.reset();  
    30.       if (!skipBrokenEdits) {  
    31.         throw e;  
    32.       }  
    33.     } catch (Throwable e) {  
    34.       in.reset();  
    35.       if (!skipBrokenEdits) {  
    36.         throw new IOException("got unexpected exception " +  
    37.             e.getMessage(), e);  
    38.       }  
    39.     }  
    40.     // Move ahead one byte and re-try the decode process.  
    41.     if (in.skip(1) < 1) {  
    42.       return null;  
    43.     }  
    44.   }  
    45. }  

            继续追踪decodeOp()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Read an opcode from the input stream. 
    3.  * 从输入流中读取一个操作符code 
    4.  * 
    5.  * @return   the opcode, or null on EOF. 
    6.  * 
    7.  * If an exception is thrown, the stream's mark will be set to the first 
    8.  * problematic byte.  This usually means the beginning of the opcode. 
    9.  */  
    10. private FSEditLogOp decodeOp() throws IOException {  
    11.   limiter.setLimit(maxOpSize);  
    12.   in.mark(maxOpSize);  
    13.   
    14.   if (checksum != null) {  
    15.     checksum.reset();  
    16.   }  
    17.   
    18.   byte opCodeByte;  
    19.   try {  
    20.         
    21.     // 从输入流in中读取一个byte,即opCodeByte  
    22.     opCodeByte = in.readByte();  
    23.   } catch (EOFException eof) {  
    24.     // EOF at an opcode boundary is expected.  
    25.     return null;  
    26.   }  
    27.   
    28.   // 将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode  
    29.   FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);  
    30.   if (opCode == OP_INVALID) {  
    31.     verifyTerminator();  
    32.     return null;  
    33.   }  
    34.   
    35.   // 根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op  
    36.   FSEditLogOp op = cache.get(opCode);  
    37.   if (op == null) {  
    38.     throw new IOException("Read invalid opcode " + opCode);  
    39.   }  
    40.   
    41.   // 如果支持编辑日志长度,从输入流读入一个int,  
    42.   if (supportEditLogLength) {  
    43.     in.readInt();  
    44.   }  
    45.   
    46.   if (NameNodeLayoutVersion.supports(  
    47.       LayoutVersion.Feature.STORED_TXIDS, logVersion)) {  
    48.     // Read the txid  
    49.     // 如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID  
    50.     op.setTransactionId(in.readLong());  
    51.   } else {  
    52.     // 如果不支持事务ID,在FSEditLogOp实例op中设置事务ID为-12345  
    53.     op.setTransactionId(HdfsConstants.INVALID_TXID);  
    54.   }  
    55.   
    56.   // 从输入流in中读入其他域,并设置入FSEditLogOp实例op  
    57.   op.readFields(in, logVersion);  
    58.   
    59.   validateChecksum(in, checksum, op.txid);  
    60.   return op;  
    61. }  

            decodeOp()方法的逻辑很简单:

            1、从输入流in中读取一个byte,即opCodeByte,确定操作类型;

            2、将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode;

            3、根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op,这样我们就得到了操作符对象;

            4、如果支持编辑日志长度,从输入流读入一个int;

            5、如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID,否则在FSEditLogOp实例op中设置事务ID为-12345;

            6、调用操作符对象op的readFields()方法,从输入流in中读入其他域,并设置入FSEditLogOp实例op。

            接下来,我们再看下操作符对象的readFields()方法,因为不同种类的操作符肯定包含不同的属性,所以它们的readFields()方法肯定也各不相同。下面,我们就以操作符AddCloseOp为例来分析,其readFields()方法如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override  
    2. void readFields(DataInputStream in, int logVersion)  
    3.     throws IOException {  
    4.       
    5.   // 读取长度:如果支持读入长度,从输入流in读取一个int,赋值给length  
    6.   if (!NameNodeLayoutVersion.supports(  
    7.       LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {  
    8.     this.length = in.readInt();  
    9.   }  
    10.     
    11.   // 读取节点ID:如果支持读入节点ID,从输入流in读取一个long,赋值给inodeId,否则inodeId默认为0  
    12.   if (NameNodeLayoutVersion.supports(  
    13.       LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {  
    14.     this.inodeId = in.readLong();  
    15.   } else {  
    16.     // The inodeId should be updated when this editLogOp is applied  
    17.     this.inodeId = INodeId.GRANDFATHER_INODE_ID;  
    18.   }  
    19.     
    20.   // 版本兼容性校验  
    21.   if ((-17 < logVersion && length != 4) ||  
    22.       (logVersion <= -17 && length != 5 && !NameNodeLayoutVersion.supports(  
    23.           LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {  
    24.     throw new IOException("Incorrect data format."  +  
    25.                           " logVersion is " + logVersion +  
    26.                           " but writables.length is " +  
    27.                           length + ". ");  
    28.   }  
    29.     
    30.   // 读取路径:从输入流in读取一个String,赋值给path  
    31.   this.path = FSImageSerialization.readString(in);  
    32.   
    33.   // 读取副本数、修改时间:如果支持读取副本数、修改时间,分别从输入流读取一个short、long,  
    34.   // 赋值给replication、mtime  
    35.   if (NameNodeLayoutVersion.supports(  
    36.       LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {  
    37.     this.replication = FSImageSerialization.readShort(in);  
    38.     this.mtime = FSImageSerialization.readLong(in);  
    39.   } else {  
    40.     this.replication = readShort(in);  
    41.     this.mtime = readLong(in);  
    42.   }  
    43.   
    44.   // 读取访问时间:如果支持读取访问时间,从输入流读取一个long,赋值给atime,否则atime默认为0  
    45.   if (NameNodeLayoutVersion.supports(  
    46.       LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {  
    47.     if (NameNodeLayoutVersion.supports(  
    48.         LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {  
    49.       this.atime = FSImageSerialization.readLong(in);  
    50.     } else {  
    51.       this.atime = readLong(in);  
    52.     }  
    53.   } else {  
    54.     this.atime = 0;  
    55.   }  
    56.   
    57.   // 读取数据块大小:如果支持读取数据块大小,从输入流读取一个long,赋值给blockSize  
    58.   if (NameNodeLayoutVersion.supports(  
    59.       LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {  
    60.     this.blockSize = FSImageSerialization.readLong(in);  
    61.   } else {  
    62.     this.blockSize = readLong(in);  
    63.   }  
    64.   
    65.   // 调用readBlocks()方法读取数据块,赋值给数据块数组blocks  
    66.   this.blocks = readBlocks(in, logVersion);  
    67.     
    68.   // 从输入流读入权限,赋值给permissions  
    69.   this.permissions = PermissionStatus.read(in);  
    70.   
    71.   // 如果是ADD操作,需要额外处理客户端名称clientName、客户端机器clientMachine、覆盖写标志overwrite等属性  
    72.   if (this.opCode == OP_ADD) {  
    73.     aclEntries = AclEditLogUtil.read(in, logVersion);  
    74.     this.xAttrs = readXAttrsFromEditLog(in, logVersion);  
    75.     this.clientName = FSImageSerialization.readString(in);  
    76.     this.clientMachine = FSImageSerialization.readString(in);  
    77.     if (NameNodeLayoutVersion.supports(  
    78.         NameNodeLayoutVersion.Feature.CREATE_OVERWRITE, logVersion)) {  
    79.       this.overwrite = FSImageSerialization.readBoolean(in);  
    80.     } else {  
    81.       this.overwrite = false;  
    82.     }  
    83.     if (NameNodeLayoutVersion.supports(  
    84.         NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {  
    85.       this.storagePolicyId = FSImageSerialization.readByte(in);  
    86.     } else {  
    87.       this.storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;  
    88.     }  
    89.     // read clientId and callId  
    90.     readRpcIds(in, logVersion);  
    91.   } else {  
    92.     this.clientName = "";  
    93.     this.clientMachine = "";  
    94.   }  
    95. }  

            这个没有什么特别好讲的,依次读入操作符需要的,在输入流中依次存在的属性即可。
            不过,我们仍然需要重点讲解下读入数据块的readBlocks()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private static Block[] readBlocks(  
    2.     DataInputStream in,  
    3.     int logVersion) throws IOException {  
    4.       
    5.   // 读取block数目numBlocks,占一个int  
    6.   int numBlocks = in.readInt();  
    7.     
    8.   // 校验block数目numBlocks,应大于等于0,小于等于1024 * 1024 * 64  
    9.   if (numBlocks < 0) {  
    10.     throw new IOException("invalid negative number of blocks");  
    11.   } else if (numBlocks > MAX_BLOCKS) {  
    12.     throw new IOException("invalid number of blocks: " + numBlocks +  
    13.         ".  The maximum number of blocks per file is " + MAX_BLOCKS);  
    14.   }  
    15.     
    16.   // 构造block数组blocks,大小即为numBlocks  
    17.   Block[] blocks = new Block[numBlocks];  
    18.     
    19.   // 从输入流中读取numBlocks个数据块  
    20.   for (int i = 0; i < numBlocks; i++) {  
    21.       
    22.     // 构造数据块Block实例blk  
    23.     Block blk = new Block();  
    24.       
    25.     // 调用Block的readFields()方法,从输入流读入数据块  
    26.     blk.readFields(in);  
    27.       
    28.     // 将数据块blk放入数据块数组blocks  
    29.     blocks[i] = blk;  
    30.   }  
    31.     
    32.   // 返回数据块数组blocks  
    33.   return blocks;  
    34. }  

            很简单,先从输入流读取block数目numBlocks,确定一共需要读取多少个数据块,然后构造block数组blocks,大小即为numBlocks,最后从输入流中读取numBlocks个数据块,每次都是先构造数据块Block实例blk,调用Block的readFields()方法,从输入流读入数据块,然后将数据块blk放入数据块数组blocks。全部数据块读取完毕后,返回数据块数组blocks。

            我们再看下数据块Block的readFields()方法,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override // Writable  
    2. public void readFields(DataInput in) throws IOException {  
    3.   readHelper(in);  
    4. }  

            继续看readHelper()方法,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  final void readHelper(DataInput in) throws IOException {  
    2.    // 从输入流读取一个long,作为数据块艾迪blockId  
    3. this.blockId = in.readLong();  
    4. // 从输入流读取一个long,作为数据块大小numBytes  
    5.    this.numBytes = in.readLong();  
    6.    // 从输入流读取一个long,作为数据块产生的时间戳generationStamp  
    7.    this.generationStamp = in.readLong();  
    8.      
    9.    // 校验:数据块大小numBytes应大于等于0  
    10.    if (numBytes < 0) {  
    11.      throw new IOException("Unexpected block size: " + numBytes);  
    12.    }  
    13.  }  

            从输入流依次读入数据块艾迪blockId、数据块大小numBytes、数据块产生的时间戳generationStamp即可,三者均为long类型。

  • 相关阅读:
    (转)ASP.NET Mvc 2.0 1. Areas的创建与执行
    新世代交易管理機制~System.Transactions
    水晶報表入門
    MyGeneration 如何连接 mysql 来生成代码
    vs.net 启动不了
    Oracle面试问题-技术篇
    把excel两列字符数据用逗号合并起来
    论Leader的技能
    物流行业名词
    html 向aspx 页面传值
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556237.html
Copyright © 2011-2022 走看看