zoukankan      html  css  js  c++  java
  • HDFS HA支持多Standby节点机制

    前言


    在现有的HDFS中,为了保证其高可用性,社区在早些年就已经完成HDFS的HA机制,也就是One Active,One Standby。在此种情况下,HDFS能够容忍其中一个节点出现失败的情况。这套HA机制的实现的确给用户带来了很大的帮助,基于此特性,我们可以做很多集群上的热操作,比如热迁移NameNode,或者滚动升级HDFS等等。可能唯一让人感觉还不是最好的一点是,它不能容忍更多失败的情况,比如2个NameNode都发生失败的情况。在其他的一些分布式系统中,例如Zookeeper,它的内部就可以容忍其中2节点出现崩溃的情况,当它启动了5个节点的时候。在HDFS内部的block副本设计上,也是保证了3副本的设计理念,同样可以容忍2个副本损坏的情况。所以我们不禁开始联想,在HDFS中什么时候也能容忍更多的出错情况?更具体地说,就是在只有一个Active NameNode情况下,同时有多个Standby NameNode。这样的话,HDFS的HA特性看上去就非常的强大了。本文我们就来好好聊聊这个话题。

    HDFS多Standby节点机制概述


    前面的铺垫内容说了这么多,那么到底目前是否已经有多Standby节点的实现机制呢?答案是有的,但是它还没有发布,目标发布版本Hadoop 3.0.又是在3.0版本,之前本人介绍了许多很棒的特性都是在这个版本发布的(比如HDFS EC),大家敬请期待这个版本吧。社区JIRA HDFS-6440(Support more than 2 NameNodes)最终实现了HA中支持多Standby的特性。本文是我阅读完此JIRA上的设计文档以及代码实现后所写的总结性文章,更多设计细节可以查看原文档。

    在之前HDFS的HA的设计实现中,其实已经帮我们实现了许多在未来可能有多Standby节点出现的情况。所以在这里,我们只需要在原来One Active,One Standby完善的机制下,做局部的修改,来满足多Standby的情况即可。以下为几个需要修改的点:

    • Zkfc的Active选举,此时不是只有另外一个可选节点,而是很多个Standby节点。
    • Checkpoint过程以及Active NameNode上的Fsimage同步问题,之前都是一个Standby NameNode定期发给Active NameNode,这个时候有多个Standby,怎么办。
    • Bootstrap过程。之前都是向另外一个Active NameNode进行bootstrap,而现在有多个节点。
    • Block token id的生成。

    主要为以上4点,其中第2点最为重要,因为涉及到元数据的更新同步,逻辑也作为复杂。在下一小节中,我们将会针对这4点做详细的分析。

    多Standby节点细节实现


    此小节将会针对以上提出的4点展开分析,下面首先是zkfc相关的改造。

    Zkfc的选举


    与原先的HA机制相比,多Standby的情况会造成锁竞争的加剧,因为每个Standby节点上的zkfc进程都要尝试获取锁,然后才会将自己的状态切到Active。所以在此建议的Standby数量不宜过多,3~5个足够了。还有当进行手动切换的时候,这个时候要保证其他节点此时不发生切换动作。

    Checkpoint元数据同步过程


    先来回顾一下原先HA机制的元数据同步过程:

    Standby节点周期性的读取JournalNode上的editlog,等到了一次checkpoint周期,然后做一次checkpoint,然后将新的fsimage同步到Active节点。

    在这个如果是多个Standby节点的情况,这个处理过程就没有那么简单了,下面几个是主要要解决的问题:

    • 这么多个Standby节点,每个节点上都有自己的fsimage,该选哪个作为最终上传镜像文件的节点呢?
      答:选择元数据最新的Standby,评判标准是看当前最新的txid。
    • 如果Active节点当前已经同步了最新fsimage,而Standby节点又将稍老的fsimage同步过去,怎么办?
      答:Active节点会进行比较,如果的确是老的fsimage,会给出失败的回复应答。

    以上两点在后面代码实现的部分会有具体的体现。

    Bootstrap过程


    我们知道bootstrap的用处一般是在集群开始搭建时,将Active上的fsimage等元数据同步到当前的节点上,然后启动当前节点。而在当前多Standby节点的变化是,由向原来另外一个Active获取元数据变为同时向多个其他节点抓取元数据,直到有一个节点能抓取到元数据为止。

    Block token id的构造


    在block token id的生成中,会根据当前NameNode index下标来生成serialNo序列号数字,然后将此数字应用到token id的生成。生成代码如下:

       public synchronized void setSerialNo(int serialNo) {
         this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); 
       }

    但是原先的处理逻辑,只适用于2个NameNode的情况,也就是下标0和1的情况。在多个Standby出现的情况,NameNode的下标就有可能出现2,3,4等情况。因此此逻辑也需要进行修改。具体改动可见HDFS-6440上的设计文档。

    多Standby情况下的Checkpoint同步


    因为在多Standby情况下的checkpoint,fsmage同步过程最为复杂,此节我们从源代码实现层面来学习一下其中的过程,主要涉及以下2个类的改造:

    • StandbyCheckpointer:Standby NameNode上专门控制做checkpoint以及上传fsimage到Active NameNode的服务。
    • ImageServlet:NameNode服务请求处理类,里面包含了fsimage上传请求的处理逻辑。

    我们首先进入StandbyCheckpointer类,

      public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
          throws IOException {
        this.namesystem = ns;
        this.conf = conf;
        // checkpoint配置类初始化
        this.checkpointConf = new CheckpointConf(conf);
        // 定期checkpoint线程初始化
        this.thread = new CheckpointerThread();
        this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
            .setNameFormat("TransferFsImageUpload-%d").build();
        // active结点地址初始化
        setNameNodeAddresses(conf);
      }

    这里会有active节点地址的初始化,

      private void setNameNodeAddresses(Configuration conf) throws IOException {
        // Look up our own address.
        myNNAddress = getHttpAddress(conf);
    
        // 获取其他NameNode节点配置,作为可能的Active NameNode
        List<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);
        activeNNAddresses = new ArrayList<URL>(confForActive.size());
        for (Configuration activeConf : confForActive) {
          URL activeNNAddress = getHttpAddress(activeConf);
    
          // sanity check each possible active NN
          Preconditions.checkArgument(checkAddress(activeNNAddress),
              "Bad address for active NN: %s", activeNNAddress);
          // 将此地址作为active的地址
          activeNNAddresses.add(activeNNAddress);
        }
    
        ...
      }

    其实从这里可以看出一点:Standby节点其实并不知道哪个是当前真正的Active NameNode

    接下来进入checkpoint的线程服务内的doWork工作方法,

        private void doWork() {
          // 获取checkpoint动作的执行周期时间,默认1小时
          final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();
          // 重置checkpoint时间,以及最近上传时间
          lastCheckpointTime = monotonicNow();
          lastUploadTime = monotonicNow();
          while (shouldRun) {
            boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
            if (!needRollbackCheckpoint) {
              try {
                // 进行checkpoint周期时间睡眠
                Thread.sleep(checkPeriod);
              } catch (InterruptedException ie) {
              }
              if (!shouldRun) {
                break;
              }
            }
            // 这里开始准备checkpoint
            ...

    我们继续来看后面执行的方法,

              final long now = monotonicNow();
              // 获取未checkpoint的tx事务数
              final long uncheckpointed = countUncheckpointedTxns();
              // 计算距离上次未更新checkpoint的时间
              final long secsSinceLast = (now - lastCheckpointTime) / 1000;
    
              // if we need a rollback checkpoint, always attempt to checkpoint
              boolean needCheckpoint = needRollbackCheckpoint;
    
              if (needCheckpoint) {
                LOG.info("Triggering a rollback fsimage for rolling upgrade.");
              } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
                // 如果当前未checkpoint的事务数已经超过默认值,就是100w,则也需要进行一次checkpoint
                LOG.info("Triggering checkpoint because there have been " + 
                    uncheckpointed + " txns since the last checkpoint, which " +
                    "exceeds the configured threshold " +
                    checkpointConf.getTxnCount());
                needCheckpoint = true;
              } else if (secsSinceLast >= checkpointConf.getPeriod()) {
                // 如果未更新时间已超出了周期时间,就是1小时,则需要进行一次checkpoint操作
                LOG.info("Triggering checkpoint because it has been " +
                    secsSinceLast + " seconds since the last checkpoint, which " +
                    "exceeds the configured interval " + checkpointConf.getPeriod());
                needCheckpoint = true;
              }

    以上代码表明了一个Standby节点做一次checkpoint需要达到的2个条件(满足一个条件即可):

    • 未做checkpoint的tx事务数超过100w
    • 超过1小时的checkpoint周期

    我们继续看下面的执行过程,

                ...
                // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
                // rollback request, are the checkpointer, are outside the quiet period.
                final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
                // sendRequest表示是否要发送新的fsimage给Active NameNode,需满足以下2个条件:
                // 1.isPrimaryCheckPointer标记为true,也就是上次已经发送过fsimage给Active NameNode
                // 2.距离上次发送fsimage给Active NameNode的时间已经超过了Standby专门发送Active NameNode的时间,
                //   默认1.5倍的checkpoint的周期时间
                boolean sendRequest = isPrimaryCheckPointer
                    || secsSinceLastUpload >= checkpointConf.getQuietPeriod();
                // 执行checkpoint动作
                doCheckpoint(sendRequest);
                ...

    在上述过程中,会判断是否要将checkpoint过后的fsimage传到Active NameNode上。然后我们继续进入doCheckpoint方法内部,

      private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
        assert canceler != null;
        final long txid;
        final NameNodeFile imageType;
        // 这里开始准备进行checkpoint操作
        namesystem.cpLockInterruptibly();
        ...
    
        //如果不需要进发送fsimage动作,则在这里会直接结束
        if(!sendCheckpoint){
          return;
        }
    
        // 新建线程池用来执行上传fsimage的动作
        ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
            uploadThreadFactory);
        List<Future<TransferFsImage.TransferResult>> uploads =
            new ArrayList<Future<TransferFsImage.TransferResult>>();
        // 遍历潜在的Active NameNode地址,添加上传fsimage的请求、
        for (final URL activeNNAddress : activeNNAddresses) {
          Future<TransferFsImage.TransferResult> upload =
              executor.submit(new Callable<TransferFsImage.TransferResult>() {
                @Override
                public TransferFsImage.TransferResult call() throws IOException {
                  return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                      .getFSImage().getStorage(), imageType, txid, canceler);
                }
              });
          uploads.add(upload);
        }
        ...
        for (; i < uploads.size(); i++) {
    
          Future<TransferFsImage.TransferResult> upload = uploads.get(i);
          try {
            // 获取上传请求结果,如果成功了,则直接退出,无须获取下个请求的处理结果
            if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
              success = true;
              break;
            }
    
          } catch (ExecutionException e) {
          ...
        }
            // 重新设置上次上传成功时间
        lastUploadTime = monotonicNow();
    
        // 记录此次上传结果,表明此Standby是当前领先的checkpointer节点
        this.isPrimaryCheckPointer = success;
        ...
      }

    Ok,以上就是StandbyCheckpointer内部的相关执行逻辑。在这里upload请求倒是发出去了,那么后面是怎么被处理的呢?接下来我们就来 ImageServlet内部的请求处理逻辑。

    我们进入ImafeServlet的doPut处理方法,因为我们是上传文件的请求,不是Get,

      protected void doPut(final HttpServletRequest request,
          final HttpServletResponse response) throws ServletException, IOException {
        try {
          ServletContext context = getServletContext();
          final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
          final Configuration conf = (Configuration) getServletContext()
              .getAttribute(JspHelper.CURRENT_CONF);
          final PutImageParams parsedParams = new PutImageParams(request, response,
              conf);
          final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
          // 验证请求信息
          validateRequest(context, conf, request, response, nnImage,
              parsedParams.getStorageInfoString());
    
          UserGroupInformation.getCurrentUser().doAs(
              new PrivilegedExceptionAction<Void>() {
    
                @Override
                public Void run() throws Exception {
                  // 获取当前节点的服务状态
                  HAServiceProtocol.HAServiceState state = NameNodeHttpServer
                      .getNameNodeStateFromContext(getServletContext());
                  // 如果当前处理的节点不是Active节点,说明请求发送到错误的目标节点上了
                  if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
                    // 在此给出错误的回复码以及出错信息
                    response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,
                        "Nameode "+request.getLocalAddr()+" is currently not in a state which can "
                            + "accept uploads of new fsimages. State: "+state);
                    return null;
                  }
                  ...

    因为之前Standby节点并不知道当前具体的Active NameNode,所以采用的是一种轮询遍历的方式,这样的话同为Standby的其他节点也会处理到。如果当前处理节点的确是Active NameNode,还会进行如下2个判断逻辑,

    ...
                  // 通过tailSet方法,来比较是否此请求是来自于更旧的事务
                  SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
                  // 如果是,则给出错误回复,表明当前已经在处理更新的fsimage文件
                  if (larger.size() > 0) {
                    response.sendError(HttpServletResponse.SC_CONFLICT,
                        "Another checkpointer is already in the process of uploading a" +
                            " checkpoint made up to transaction ID " + larger.last());
                    return null;
                  }
    
                  // 保证当前只处理一份请求,不处理重复的请求
                  if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
                    response.sendError(HttpServletResponse.SC_CONFLICT,
                        "Either current namenode is checkpointing or another"
                            + " checkpointer is already in the process of "
                            + "uploading a checkpoint made at transaction ID "
                            + txid);
                    return null;
                  }
    ...

    前面判断如果都没问题,最后会进行fsimage的下载动作,Active NameNode会从目标Standby NameNode上download文件,

    
                    InputStream stream = request.getInputStream();
                    try {
                      long start = monotonicNow();
                      // 此处进行镜像文件下载操作
                      MD5Hash downloadImageDigest = TransferFsImage
                          .handleUploadImageRequest(request, txid,
                              nnImage.getStorage(), stream,
                              parsedParams.getFileSize(), getThrottler(conf));
                      nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
                          downloadImageDigest);
                      ...
                    } finally {
                      // 下载完成后移除镜像文件的请求
                      currentlyDownloadingCheckpoints.remove(imageRequest);
    
                      stream.close();
                    }

    具体上传的细节处理大家可以查阅TransferFsImage类里的代码。

    总结


    所以总的来看,HDFS-6440支持多Standby特性更多的是一些适配的改造,而不是对原先HA机制的大改。但是依然不可否认,这个特性要做的周边工作还是很多的,比如对应的unit test的构造,这些工作量也是很大的。最后一个小小的建议,配置多Standby的时候,建议数量不宜过多,3~5个足够了,2点原因:第一,zkfc切换选取Active时锁竞争的问题;第二,这些Standby节点同时tail editlog时造成的JournalNode带宽使用上升问题。

    参考资料


    [1].https://issues.apache.org/jira/browse/HDFS-6440
    [2].https://issues.apache.org/jira/secure/attachment/12677453/Multiple-Standby-NameNodes_V1.pdf

  • 相关阅读:
    opencv 5 图像转换(1 边缘检测)
    opencv 4 图像处理(漫水填充,图像金字塔与图片尺寸缩放,阈(yu)值化)
    opencv 4 图像处理(2 形态学滤波:腐蚀与膨胀,开运算、闭运算、形态学梯度、顶帽、黑帽)
    HDU 1847-Good Luck in CET-4 Everybody!-博弈SG函数模板
    网络流
    multiset的erase()操作中出现跳过元素的问题
    HRBUST
    L1-8 矩阵A乘以B (15 分)
    L2-2 重排链表 (25 分)
    L2-4 部落 (25 分)
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183726.html
Copyright © 2011-2022 走看看