zoukankan      html  css  js  c++  java
  • HDFS源码分析EditLog之获取编辑日志输入流

    《HDFS源码分析之EditLogTailer》一文中,我们详细了解了编辑日志跟踪器EditLogTailer的实现,介绍了其内部编辑日志追踪线程EditLogTailerThread的实现,及其线程完成编辑日志跟踪所依赖的最重要的方法,执行日志追踪的doTailEdits()方法。在该方法的处理流程中,首先需要从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据。那么这个编辑日志输入流集合streams是如何获取的呢?本文我们将进行详细研究。

            在doTailEdits()方法中,获取编辑日志输入流的代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据    
    2. streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);    

            这个editLog便是一个文件系统编辑日志FSEditLog实例,我们看下它的selectInputStreams()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Select a list of input streams. 
    3.  *  
    4.  * @param fromTxId first transaction in the selected streams 
    5.  * @param toAtLeastTxId the selected streams must contain this transaction 
    6.  * @param recovery recovery context 
    7.  * @param inProgressOk set to true if in-progress streams are OK 
    8.  */  
    9. public Collection<EditLogInputStream> selectInputStreams(  
    10.     long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,  
    11.     boolean inProgressOk) throws IOException {  
    12.   
    13. / 创建编辑日志输入流EditLogInputStream列表streams  
    14.   List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();  
    15.     
    16.   // 在Object对象journalSetLock上使用synchronized进行同步  
    17.   synchronized(journalSetLock) {  
    18.       
    19.     // 检测journalSet状态  
    20.     Preconditions.checkState(journalSet.isOpen(), "Cannot call " +  
    21.         "selectInputStreams() on closed FSEditLog");  
    22.       
    23.     // 调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始,  
    24.     // 编辑日志同步时,标志位inProgressOk为false  
    25.     selectInputStreams(streams, fromTxId, inProgressOk);  
    26.   }  
    27.   
    28.   try {  
    29.     // 数据监测  
    30.     checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);  
    31.   } catch (IOException e) {  
    32.     if (recovery != null) {  
    33.       // If recovery mode is enabled, continue loading even if we know we  
    34.       // can't load up to toAtLeastTxId.  
    35.       LOG.error(e);  
    36.     } else {  
    37.       closeAllStreams(streams);  
    38.       throw e;  
    39.     }  
    40.   }  
    41.   return streams;  
    42. }  

            它首先会创建编辑日志输入流EditLogInputStream列表streams,并在Object对象journalSetLock上使用synchronized进行同步,检测journalSet状态,随后调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始,编辑日志是否可处于处理过程中的标志位inProgressOk,编辑日志同步时,标志位inProgressOk为false,最后,调用checkForGaps()方法进行相关数据监测。

            我们继续看下三个参数的selectInputStreams()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override  
    2. public void selectInputStreams(Collection<EditLogInputStream> streams,  
    3.     long fromTxId, boolean inProgressOk) throws IOException {  
    4.    
    5. / 调用JournalSet的同名方法  
    6.   journalSet.selectInputStreams(streams, fromTxId, inProgressOk);  
    7. }  

            它其实是调用JournalSet的同名方法。JournalSet是什么呢?它是Journal集合的管理者,而Journal就是日志的意思,它是Hadoop HA中EditLog在JournalNode上的组织形式。我们看下JournalSet的selectInputStreams()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * In this function, we get a bunch of streams from all of our JournalManager 
    3.  * objects.  Then we add these to the collection one by one. 
    4.  * 在这个方法内,我们从所有JournalManager对象中得到一堆输入流。接着我们把它们一个接一个的添加到集合中。 
    5.  *  
    6.  * @param streams          The collection to add the streams to.  It may or  
    7.  *                         may not be sorted-- this is up to the caller. 
    8.  * @param fromTxId         The transaction ID to start looking for streams at 
    9.  * @param inProgressOk     Should we consider unfinalized streams? 
    10.  */  
    11. @Override  
    12. public void selectInputStreams(Collection<EditLogInputStream> streams,  
    13.     long fromTxId, boolean inProgressOk) throws IOException {  
    14.   final PriorityQueue<EditLogInputStream> allStreams =   
    15.       new PriorityQueue<EditLogInputStream>(64,  
    16.           EDIT_LOG_INPUT_STREAM_COMPARATOR);  
    17.     
    18.   // 遍历journals,得到每个JournalAndStream实例jas  
    19.   for (JournalAndStream jas : journals) {  
    20.       
    21.     // 如果jas不可用,记录日志,跳过  
    22.     if (jas.isDisabled()) {  
    23.       LOG.info("Skipping jas " + jas + " since it's disabled");  
    24.       continue;  
    25.     }  
    26.       
    27.     // 利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合  
    28.     try {  
    29.       jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);  
    30.     } catch (IOException ioe) {  
    31.       LOG.warn("Unable to determine input streams from " + jas.getManager() +  
    32.           ". Skipping.", ioe);  
    33.     }  
    34.   }  
    35.     
    36.   // 过滤掉无用的编辑日志输入流  
    37.   chainAndMakeRedundantStreams(streams, allStreams, fromTxId);  
    38. }  

            这个方法的大体处理流程如下:

            1、遍历JournalSet中的journals,得到每个JournalAndStream实例jas:

                  1.1、如果jas不可用,记录日志,跳过;

                  1.2、利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合;

            2、调用chainAndMakeRedundantStreams()方法,过滤掉无用的编辑日志输入流。
            首先,我们来看下JournalManager实例的selectInputStreams()方法,我们以FileJournalManager为例,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  @Override  
    2.  synchronized public void selectInputStreams(  
    3.      Collection<EditLogInputStream> streams, long fromTxId,  
    4.      boolean inProgressOk) throws IOException {  
    5.     
    6. // 先调用StorageDirectory的getCurrentDir()方法获得其current目录,  
    7. // 然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs  
    8.    List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());  
    9.    LOG.debug(this + ": selecting input streams starting at " + fromTxId +   
    10.        (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +  
    11.        "from among " + elfs.size() + " candidate file(s)");  
    12.      
    13.    // 调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams  
    14.    addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);  
    15.  }  

            这个方法的处理流程如下:

            1、先调用StorageDirectory的getCurrentDir()方法获得其current目录,然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs;

            2、调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams。

            我们再看下matchEditLogs()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  /** 
    2.   * returns matching edit logs via the log directory. Simple helper function 
    3.   * that lists the files in the logDir and calls matchEditLogs(File[]) 
    4.   *  
    5.   * @param logDir 
    6.   *          directory to match edit logs in 
    7.   * @return matched edit logs 
    8.   * @throws IOException 
    9.   *           IOException thrown for invalid logDir 
    10.   */  
    11.  public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {  
    12.    return matchEditLogs(FileUtil.listFiles(logDir));  
    13.  }  
    14.    
    15.  static List<EditLogFile> matchEditLogs(File[] filesInStorage) {  
    16.    return matchEditLogs(filesInStorage, false);  
    17.  }  
    18.   
    19.  private static List<EditLogFile> matchEditLogs(File[] filesInStorage,  
    20.      boolean forPurging) {  
    21.     
    22. // 创建编辑日志文件EditLogFile列表ret  
    23.    List<EditLogFile> ret = Lists.newArrayList();  
    24.      
    25.    // 遍历filesInStorage,对每个文件进行处理  
    26.    for (File f : filesInStorage) {  
    27.       
    28.      // 获得文件名  
    29.      String name = f.getName();  
    30.        
    31.      // Check for edits  
    32.      // 根据文件名,利用正则表达式,检测其是否为编辑日志edits log  
    33.      // 正则表达式为edits_(d+)-(d+)  
    34.      // 原edits_(\d+)-(\d+)中d前面的两个中第一个只是转义符  
    35.      // 文件名类似如下:edits_0000000000001833048-0000000000001833081  
    36.      // 第一串数字为起始事务ID,第二串数字为终止事务ID  
    37.      Matcher editsMatch = EDITS_REGEX.matcher(name);  
    38.        
    39.      if (editsMatch.matches()) {// 文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件  
    40.        try {  
    41.           
    42.          // 获取起始事务ID:正则表达式中第一个匹配的是起始事务ID  
    43.          long startTxId = Long.parseLong(editsMatch.group(1));  
    44.            
    45.          // 获取终止事务ID:正则表达式中第二个匹配的是终止事务ID  
    46.          long endTxId = Long.parseLong(editsMatch.group(2));  
    47.            
    48.          // 利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例,  
    49.          // 并添加到ret列表中  
    50.          ret.add(new EditLogFile(f, startTxId, endTxId));  
    51.          continue;  
    52.        } catch (NumberFormatException nfe) {  
    53.          LOG.error("Edits file " + f + " has improperly formatted " +  
    54.                    "transaction ID");  
    55.          // skip  
    56.        }  
    57.      }  
    58.        
    59.      // Check for in-progress edits  
    60.      // 检测正在处理中的编辑日志  
    61.      // 正则表达式为edits_inprogress_(d+)  
    62.      // 原edits_inprogress_(\d+)中d前面的两个中第一个只是转义符  
    63.      Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);  
    64.      if (inProgressEditsMatch.matches()) {  
    65.        try {  
    66.          long startTxId = Long.parseLong(inProgressEditsMatch.group(1));  
    67.          ret.add(  
    68.              new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));  
    69.          continue;  
    70.        } catch (NumberFormatException nfe) {  
    71.          LOG.error("In-progress edits file " + f + " has improperly " +  
    72.                    "formatted transaction ID");  
    73.          // skip  
    74.        }  
    75.      }  
    76.      if (forPurging) {  
    77.        // Check for in-progress stale edits  
    78.        Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX  
    79.            .matcher(name);  
    80.        if (staleInprogressEditsMatch.matches()) {  
    81.          try {  
    82.            long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));  
    83.            ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,  
    84.                true));  
    85.            continue;  
    86.          } catch (NumberFormatException nfe) {  
    87.            LOG.error("In-progress stale edits file " + f + " has improperly "  
    88.                + "formatted transaction ID");  
    89.            // skip  
    90.    }  
    91.        }  
    92.      }  
    93.    }  
    94.    return ret;  
    95.  }  

            我们看下最核心的两个参数的matchEditLogs()方法,它的处理流程为:

            1、创建编辑日志文件EditLogFile列表ret;

            2、遍历filesInStorage,对每个文件进行处理:

                  2.1、获得文件名name;

                  2.2、根据文件名name,利用正则表达式,检测其是否为编辑日志edits log:

                            利用的正则表达式为edits_(d+)-(d+),原edits_(\d+)-(\d+)中d前面的两个中第一个只是转义符,文件名类似如下:edits_0000000000001833048-0000000000001833081,第一串数字为起始事务ID,第二串数字为终止事务ID;

                  2.3、文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件:

                           2.3.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;

                           2.3.2、获取终止事务艾迪endTxId:正则表达式中第二个匹配的是终止事务ID;

                           2.3.3、利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例,并添加到ret列表中;

                           2.3.4、continue,继续循环下一个文件;

                  2.4、根据文件名name,利用正则表达式,检测其是否为正在处理中的编辑日志edits log:

                            利用的正则表达式为edits_inprogress_(d+),原edits_inprogress_(\d+)中d前面的两个中第一个只是转义符,文件名类似如下:edits_inprogress_0000000000001853186,后面的字符串为起始事务ID;

                  2.5、文件名匹配正则表达式的话,说明其是我们需要寻找的正在处理中的编辑日志文件:

                           2.5.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;

                           2.5.2、利用文件f、起始事务艾迪startTxId、终止事务艾迪-12345构造编辑日志文件EditLogFile实例,并添加到ret列表中;

                           2.5.3、continue,继续循环下一个文件;

            3、返回编辑日志文件EditLogFile列表ret。

            再回到FileJournalManager的selectInputStreams()方法,我们看下它的第二步:调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs添加输入流列表streams,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,  
    2.      Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {  
    3.      
    4. // 遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理:  
    5. for (EditLogFile elf : elfs) {  
    6.        
    7.   // 如果elf处于处理过程中:  
    8.   if (elf.isInProgress()) {  
    9.          
    10.     // 如果不需要获取处于处理过程中的编辑日志,直接跳过  
    11.     if (!inProgressOk) {  
    12.          LOG.debug("passing over " + elf + " because it is in progress " +  
    13.              "and we are ignoring in-progress logs.");  
    14.          continue;  
    15.        }  
    16.       
    17.     // 校验编辑日志,校验不成功的话,直接跳过  
    18.        try {  
    19.          elf.validateLog();  
    20.        } catch (IOException e) {  
    21.          LOG.error("got IOException while trying to validate header of " +  
    22.              elf + ".  Skipping.", e);  
    23.          continue;  
    24.        }  
    25.      }  
    26.     
    27.   // 如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过  
    28.      if (elf.lastTxId < fromTxId) {  
    29.        assert elf.lastTxId != HdfsConstants.INVALID_TXID;  
    30.        LOG.debug("passing over " + elf + " because it ends at " +  
    31.            elf.lastTxId + ", but we only care about transactions " +  
    32.            "as new as " + fromTxId);  
    33.        continue;  
    34.      }  
    35.        
    36.      // 利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress,  
    37.      // 构造编辑日志文件输入流EditLogFileInputStream实例elfis  
    38.      EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),  
    39.            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());  
    40.      LOG.debug("selecting edit log stream " + elf);  
    41.        
    42.      // 将elfis加入到输入流列表streams  
    43.      streams.add(elfis);  
    44.    }  
    45.  }  

            它的处理流程如下:

            遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理:

            1、如果elf处于处理过程中,同时如果不需要获取处于处理过程中的编辑日志,直接跳过,否则校验编辑日志,校验不成功的话,直接跳过,成功则继续;

            2、如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过,否则继续;

            3、利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress,构造编辑日志文件输入流EditLogFileInputStream实例elfis;

            4、将elfis加入到输入流列表streams。

  • 相关阅读:
    CSS all 属性
    platform (Operating System) – Python 中文开发手册
    HTML DOM querySelectorAll() 方法
    Linux enable命令
    dnat & snat
    鲲鹏920 上实现lvs fullnat--安装ipvsadm
    kylin os操作系统镜像适配ironic 裸金属 管理
    neutron network:router_gateway
    tcpdump实现和run_filter
    ebpf sock sk_filter实现
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556235.html
Copyright © 2011-2022 走看看