zoukankan      html  css  js  c++  java
  • HDFS源码分析(五)-----节点注册与心跳机制

    前言

    在Hadoop的HDFS启动的时候,不知道大家有没有注意到一个细节,一般都是先启动NameNode,然后再启动DataNode,细想一下,原因就很简单了,因为NameNode要维护元数据信息,而这些信息都是要等待后续启动的DataNode的情况汇报才能逐步构建的.然后之后通过保持心跳的形式进行block块映射关系的维护与更新.而今天的文章就以此方面,对这块流程做全面的分析.


    相关涉及类

    依旧需要介绍一下相关的涉及类,首先要有一个大概的了解.下面是主要的类:

    1.DataNode--数据节点类,这个和之前的数据节点描述符类又又有点不同,里面也定义了许多与数据节点相关的方法.

    2.NaemNode--名字节点类,注册信息的处理以及心跳包的处理都需要名字节点处理,名字节点的处理方法会调用FSNamesystem大系统中的方法.

    3.DatanodeCommand以及BlockCommand--数据节点命令类以及他的子类,block相关命令类,此类用于名字节点心跳回复命令给数据节点时用的.

    4.FSNamesystem和DatanodeDescriptor--附属类,这些类中的某些方法会在上述过程中被用到.

    OK,涉及的类的总数也不多,下面讲述第一个流程,节点注册,数据节点是如何在启动的时候注册到名字节点的呢.


    节点注册

    节点的注册是在数据节点启动之后发生的,首先进入main主方法

    public static void main(String args[]) {
        secureMain(args, null);
      }
    然后进入secureMain

    public static void secureMain(String [] args, SecureResources resources) {
        try {
          StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
          DataNode datanode = createDataNode(args, null, resources);
          if (datanode != null)
            datanode.join();
        } catch (Throwable e) {
          LOG.error(StringUtils.stringifyException(e));
          System.exit(-1);
        } finally {
          // We need to add System.exit here because either shutdown was called or
          // some disk related conditions like volumes tolerated or volumes required
          // condition was not met. Also, In secure mode, control will go to Jsvc and
          // the process hangs without System.exit.
          LOG.info("Exiting Datanode");
          System.exit(0);
        }
      }
    进入createDataNode方法,中间还会有1,2个方法,最终会调用到核心的runDatanodeDaemon()方法

    /** Start a single datanode daemon and wait for it to finish.
       *  If this thread is specifically interrupted, it will stop waiting.
       * 数据节点启动的核心方法
       */
      public static void runDatanodeDaemon(DataNode dn) throws IOException {
        if (dn != null) {
          //register datanode
          //首先注册节点
          dn.register();
          //后续开启相应线程
          dn.dataNodeThread = new Thread(dn, dnThreadName);
          dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
          dn.dataNodeThread.start();
        }
      }
    于是就调用了register的方法

    /**
       * Register datanode
       * <p>
       * The datanode needs to register with the namenode on startup in order
       * 1) to report which storage it is serving now and 
       * 2) to receive a registrationID 
       * issued by the namenode to recognize registered datanodes.
       * 
       * @see FSNamesystem#registerDatanode(DatanodeRegistration)
       * @throws IOException
       * 数据节点的注册方法,会调用到namenode上的注册方法
       */
      private void register() throws IOException {
        if (dnRegistration.getStorageID().equals("")) {
          setNewStorageID(dnRegistration);
        }
        while(shouldRun) {
          try {
            // reset name to machineName. Mainly for web interface.
            dnRegistration.name = machineName + ":" + dnRegistration.getPort();
            //调用namenode上的注册方法
            dnRegistration = namenode.register(dnRegistration);
            break;
    ....
    然后这里,我们可以看到,注册方法实质上调用名字节点的注册方法.跟踪名字节点的同名法方法

    ////////////////////////////////////////////////////////////////
      // DatanodeProtocol
      ////////////////////////////////////////////////////////////////
      /** 
       * 名字节点的注册方法,调用的是FSNameSystem方法
       */
      public DatanodeRegistration register(DatanodeRegistration nodeReg
                                           ) throws IOException {
        //首先做版本验证 
        verifyVersion(nodeReg.getVersion());
        //调用namesystem的注册节点方法
        namesystem.registerDatanode(nodeReg);
          
        return nodeReg;
      }

    名字节点调用的又是命名系统的方法,对注册节点的判断分为以下3种情况

    1、现有的节点进行新的存储ID注册

    2、现有节点的重复注册,由于集群已经保存有此信息,进行网络位置的更新即可

    3、从未注册过的节点,直接进行分配新的存储ID进行注册。

    具体方法判断如下,方法比较长

    /**
       * Register Datanode.
       * <p>
       * The purpose of registration is to identify whether the new datanode
       * serves a new data storage, and will report new data block copies,
       * which the namenode was not aware of; or the datanode is a replacement
       * node for the data storage that was previously served by a different
       * or the same (in terms of host:port) datanode.
       * The data storages are distinguished by their storageIDs. When a new
       * data storage is reported the namenode issues a new unique storageID.
       * <p>
       * Finally, the namenode returns its namespaceID as the registrationID
       * for the datanodes. 
       * namespaceID is a persistent attribute of the name space.
       * The registrationID is checked every time the datanode is communicating
       * with the namenode. 
       * Datanodes with inappropriate registrationID are rejected.
       * If the namenode stops, and then restarts it can restore its 
       * namespaceID and will continue serving the datanodes that has previously
       * registered with the namenode without restarting the whole cluster.
       * 
       * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
       * 名字节点实现数据节点的注册操作
       */
      public synchronized void registerDatanode(DatanodeRegistration nodeReg
                                                ) throws IOException {
        String dnAddress = Server.getRemoteAddress();
        if (dnAddress == null) {
          // Mostly called inside an RPC.
          // But if not, use address passed by the data-node.
          dnAddress = nodeReg.getHost();
        }      
    
        // check if the datanode is allowed to be connect to the namenode
        if (!verifyNodeRegistration(nodeReg, dnAddress)) {
          throw new DisallowedDatanodeException(nodeReg);
        }
    
        String hostName = nodeReg.getHost();
          
        // update the datanode's name with ip:port
        DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
                                          nodeReg.getStorageID(),
                                          nodeReg.getInfoPort(),
                                          nodeReg.getIpcPort());
        nodeReg.updateRegInfo(dnReg);
        nodeReg.exportedKeys = getBlockKeys();
          
        NameNode.stateChangeLog.info(
                                     "BLOCK* NameSystem.registerDatanode: "
                                     + "node registration from " + nodeReg.getName()
                                     + " storage " + nodeReg.getStorageID());
    
        //取出主机相关信息
        DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
        DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
        
        //判断此节点之前是否已经存在
        if (nodeN != null && nodeN != nodeS) {
         //此情况为数据节点存在,但是使用了新的存储ID
          NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
                            + "node from name: " + nodeN.getName());
          // nodeN previously served a different data storage, 
          // which is not served by anybody anymore.
          //移动掉旧的datanodeID信息
          removeDatanode(nodeN);
          // physically remove node from datanodeMap
          //从物理层面的记录进行移除
          wipeDatanode(nodeN);
          nodeN = null;
        }
        
        //重复注册的情况
        if (nodeS != null) {
          if (nodeN == nodeS) {
            // The same datanode has been just restarted to serve the same data 
            // storage. We do not need to remove old data blocks, the delta will
            // be calculated on the next block report from the datanode
            NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
                                          + "node restarted.");
          } else {
            // nodeS is found
            /* The registering datanode is a replacement node for the existing 
              data storage, which from now on will be served by a new node.
              If this message repeats, both nodes might have same storageID 
              by (insanely rare) random chance. User needs to restart one of the
              nodes with its data cleared (or user can just remove the StorageID
              value in "VERSION" file under the data directory of the datanode,
              but this is might not work if VERSION file format has changed 
           */        
            NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
                                          + "node " + nodeS.getName()
                                          + " is replaced by " + nodeReg.getName() + 
                                          " with the same storageID " +
                                          nodeReg.getStorageID());
          }
          // update cluster map
          //更新集群的网络信息
          clusterMap.remove(nodeS);
          nodeS.updateRegInfo(nodeReg);
          nodeS.setHostName(hostName);
          
          // resolve network location
          resolveNetworkLocation(nodeS);
          clusterMap.add(nodeS);
            
          // also treat the registration message as a heartbeat
          synchronized(heartbeats) {
            if( !heartbeats.contains(nodeS)) {
              heartbeats.add(nodeS);
              //update its timestamp
              nodeS.updateHeartbeat(0L, 0L, 0L, 0);
              nodeS.isAlive = true;
            }
          }
          return;
        } 
    
        // this is a new datanode serving a new data storage
        //当此时确认为一个新的节点时,为新节点分配存储ID
        if (nodeReg.getStorageID().equals("")) {
          // this data storage has never been registered
          // it is either empty or was created by pre-storageID version of DFS
          nodeReg.storageID = newStorageID();
          NameNode.stateChangeLog.debug(
                                        "BLOCK* NameSystem.registerDatanode: "
                                        + "new storageID " + nodeReg.getStorageID() + " assigned.");
        }
        // register new datanode
        //创建新的节点
        DatanodeDescriptor nodeDescr 
          = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
        resolveNetworkLocation(nodeDescr);
        unprotectedAddDatanode(nodeDescr);
        clusterMap.add(nodeDescr);
          
        // also treat the registration message as a heartbeat
        //将注册信息加入到心跳
        synchronized(heartbeats) {
          heartbeats.add(nodeDescr);
          nodeDescr.isAlive = true;
          // no need to update its timestamp
          // because its is done when the descriptor is created
        }
        return;
      }
    移除原有信息的代码如下,还做了更新操作

    /**
       * remove a datanode descriptor
       * @param nodeInfo datanode descriptor
       */
      private void removeDatanode(DatanodeDescriptor nodeInfo) {
        synchronized (heartbeats) {
          if (nodeInfo.isAlive) {
          	//更新集群中的统计信息
            updateStats(nodeInfo, false);
            //从心跳列表信息中移除对于此节点的心跳信息
            heartbeats.remove(nodeInfo);
            nodeInfo.isAlive = false;
          }
        }
    
        //移除第二关系中数据块对于此节点的映射关系
        for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
          removeStoredBlock(it.next(), nodeInfo);
        }
        unprotectedRemoveDatanode(nodeInfo);
        //从集群图中移除此节点信息
        clusterMap.remove(nodeInfo);
      }

    心跳机制

    心跳机制最简单的由来就是为了证明数据节点还活着,如果一段时间内datanode没有向namenode发送心跳包信息,就会被dead状态。并且datanode从心跳包回复中获取命令信息,然后进行下一步操作,所以从这里可以看出,心跳机制在整个HDFS系统中都有很重要的作用。下面一步步揭开HDFS心跳机制的实现。

    首先心跳信息是由数据节点发起的,主动方在Datanode上,就是下面这个方法

    /**
       * Main loop for the DataNode.  Runs until shutdown,
       * forever calling remote NameNode functions.
       * datanode在循环中不断向名字节点发送心跳信息 
       */
      public void offerService() throws Exception {
         
        LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
           " Initial delay: " + initialBlockReportDelay + "msec");
    
        //
        // Now loop for a long time....
        //
    
        while (shouldRun) {
          try {
            long startTime = now();
    
            //
            // Every so often, send heartbeat or block-report
            //
            
            if (startTime - lastHeartbeat > heartBeatInterval) {
              //
              // All heartbeat messages include following info:
              // -- Datanode name
              // -- data transfer port
              // -- Total capacity
              // -- Bytes remaining
              //
             //向名字节点发送此时节点的一些信息,dfs使用量,剩余使用量信息等
              lastHeartbeat = startTime;
              //调用namenode.sendHeartbeat进行心跳信息的发送,返回数据节点的操作命令
              DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                           data.getCapacity(),
                                                           data.getDfsUsed(),
                                                           data.getRemaining(),
                                                           xmitsInProgress.get(),
                                                           getXceiverCount());
              myMetrics.addHeartBeat(now() - startTime);
              //LOG.info("Just sent heartbeat, with name " + localName);
              //进行返回命令的处理,如果没有成功不进行后续block块上报工作
              if (!processCommand(cmds))
                continue;
            }
    首先先观察前部分代码,可以看到,这是一个循环,而且是周期性的发送消息,然后调用namenode的方法进行心跳信息发送,然后接收DatanodeCommand回复命令,然后再本节点执行。然后,跟踪一下里面namenode的方法

    /**
       * Data node notify the name node that it is alive 
       * Return an array of block-oriented commands for the datanode to execute.
       * This will be either a transfer or a delete operation.
       * 数据节点调用此方法进行心跳信息的发送
       */
      public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
                                           long capacity,
                                           long dfsUsed,
                                           long remaining,
                                           int xmitsInProgress,
                                           int xceiverCount) throws IOException {
        //对节点注册信息的确认
        verifyRequest(nodeReg);
        //然后再次调用fsnamesystem的handleHeartbeat方法
        return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
            xceiverCount, xmitsInProgress);
      }
    namenode调用的又是命名系统的方法

    /**
       * The given node has reported in.  This method should:
       * 1) Record the heartbeat, so the datanode isn't timed out
       * 2) Adjust usage stats for future block allocation
       * 
       * If a substantial amount of time passed since the last datanode 
       * heartbeat then request an immediate block report.  
       * 
       * @return an array of datanode commands 
       * @throws IOException
       * 一个给定的数据节点进行心跳信息的上报,主要做2个操作
       * 1.心跳信息的记录,避免数据节点超时
       * 2.调整新的名字节点中维护的数据块分配情况
       */
      DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
          long capacity, long dfsUsed, long remaining,
          int xceiverCount, int xmitsInProgress) throws IOException {
        DatanodeCommand cmd = null;
        synchronized (heartbeats) {
          synchronized (datanodeMap) {
            DatanodeDescriptor nodeinfo = null;
            try {
              nodeinfo = getDatanode(nodeReg);
            } catch(UnregisteredDatanodeException e) {
              return new DatanodeCommand[]{DatanodeCommand.REGISTER};
            }
              
            // Check if this datanode should actually be shutdown instead. 
            if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
              setDatanodeDead(nodeinfo);
              throw new DisallowedDatanodeException(nodeinfo);
            }
    
            //如果不存在此节点信息,说明此节点还未注册,返回节点注册命令
            if (nodeinfo == null || !nodeinfo.isAlive) {
              return new DatanodeCommand[]{DatanodeCommand.REGISTER};
            }
    
            updateStats(nodeinfo, false);
            nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
            updateStats(nodeinfo, true);
    看前半段的代码分析,主要是做相应节点的信息更新,如果节点是未注册过的野节点的话,返回注册命令,但是还有下半段的操作,

    /**
       * The given node has reported in.  This method should:
       * 1) Record the heartbeat, so the datanode isn't timed out
       * 2) Adjust usage stats for future block allocation
       * 
       * If a substantial amount of time passed since the last datanode 
       * heartbeat then request an immediate block report.  
       * 
       * @return an array of datanode commands 
       * @throws IOException
       * 一个给定的数据节点进行心跳信息的上报,主要做2个操作
       * 1.心跳信息的记录,避免数据节点超时
       * 2.调整新的名字节点中维护的数据块分配情况
       */
      DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
          long capacity, long dfsUsed, long remaining,
          int xceiverCount, int xmitsInProgress) throws IOException {
       ........... 
            //check lease recovery
            //检查租约过期情况
            cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
            if (cmd != null) {
              return new DatanodeCommand[] {cmd};
            }
            
            //新建命令集合,心跳回复将返回许多的命令
            ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
            //check pending replication
            //获取待复制的副本块命令,命令中包含有副本块列表内容
            cmd = nodeinfo.getReplicationCommand(
                  maxReplicationStreams - xmitsInProgress);
            if (cmd != null) {
              cmds.add(cmd);
            }
            //check block invalidation
            //块删除命令
            cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
            if (cmd != null) {
              cmds.add(cmd);
            }
            // check access key update
            if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
              cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
              nodeinfo.needKeyUpdate = false;
            }
            // check for balancer bandwidth update
            if (nodeinfo.getBalancerBandwidth() > 0) {
              cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
              // set back to 0 to indicate that datanode has been sent the new value
              nodeinfo.setBalancerBandwidth(0);
            }
            if (!cmds.isEmpty()) {
              //返回命令组
              return cmds.toArray(new DatanodeCommand[cmds.size()]);
            }
          }
        }
    就是命名系统类会检查节点block情况,进行命令回复,比如添加需要复制的block请求,无效快的删除请求等等。下面是一个示例

    //与block命令相关的函数
      BlockCommand getReplicationCommand(int maxTransfers) {
      	//获取待复制的副本block块列表
        List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
        //将变量保证在BlockCommand中进行返回
        return blocktargetlist == null? null:
            new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
      }
    因为BlockCommand是DataCommand的子集,也是属于DatanodeCommand。在这里有必要了解一下,datanode执行命令的形式,首先是父类,比较简单一些

    //DatanodeCommand继承自Writable序列化类,说明命令是被序列化传输的
    public abstract class DatanodeCommand implements Writable {
      static class Register extends DatanodeCommand {
        private Register() {super(DatanodeProtocol.DNA_REGISTER);}
        public void readFields(DataInput in) {}
        public void write(DataOutput out) {}
      }
    
      static class Finalize extends DatanodeCommand {
        private Finalize() {super(DatanodeProtocol.DNA_FINALIZE);}
        public void readFields(DataInput in) {}
        public void write(DataOutput out) {}
      }
      .....
      //action保存了命令操作类型
      private int action;
      
      .....
      
      ///////////////////////////////////////////
      // Writable
      ///////////////////////////////////////////
      //DatanodeCommand将命令操作写在序列化流中
      public void write(DataOutput out) throws IOException {
        out.writeInt(this.action);
      }
      
      public void readFields(DataInput in) throws IOException {
        this.action = in.readInt();
      }
    }
    这里的action很重要,保存的就是命令的类型,同时,注意这些都是可序列化的,用于RPC传输。那么与Block相关的命令在上面做了哪些的改变才使得能把block块列表的信息也传入命令中呢,答案就在下面

    /****************************************************
     * A BlockCommand is an instruction to a datanode 
     * regarding some blocks under its control.  It tells
     * the DataNode to either invalidate a set of indicated
     * blocks, or to copy a set of indicated blocks to 
     * another DataNode.
     * 
     ****************************************************/
    public class BlockCommand extends DatanodeCommand {
      Block blocks[];
      DatanodeInfo targets[][];
    
      public BlockCommand() {}
    
      /**
       * Create BlockCommand for transferring blocks to another datanode
       * @param blocktargetlist    blocks to be transferred 
       */
      public BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
        super(action);
    
        blocks = new Block[blocktargetlist.size()]; 
        targets = new DatanodeInfo[blocks.length][];
        for(int i = 0; i < blocks.length; i++) {
          BlockTargetPair p = blocktargetlist.get(i);
          blocks[i] = p.block;
          targets[i] = p.targets;
        }
      }
    
      private static final DatanodeInfo[][] EMPTY_TARGET = {};
    
      /**
       * Create BlockCommand for the given action
       * @param blocks blocks related to the action
       */
      public BlockCommand(int action, Block blocks[]) {
        super(action);
        this.blocks = blocks;
        this.targets = EMPTY_TARGET;
      }
      ......
      
      //重载序列化写入方法
      public void write(DataOutput out) throws IOException {
        super.write(out);
        out.writeInt(blocks.length);
        //将block块依次序列化写入
        for (int i = 0; i < blocks.length; i++) {
          blocks[i].write(out);
        }
        out.writeInt(targets.length);
        for (int i = 0; i < targets.length; i++) {
          out.writeInt(targets[i].length);
          for (int j = 0; j < targets[i].length; j++) {
            targets[i][j].write(out);
          }
        }
      }
    
    OK,心跳回复命令已经清楚,看看数据节点如何执行命名节点返回的心跳命令然后做出调整。

    /**
       * Process an array of datanode commands
       * 
       * @param cmds an array of datanode commands
       * @return true if further processing may be required or false otherwise. 
       * 数据节点批量执行操作
       */
      private boolean processCommand(DatanodeCommand[] cmds) {
        if (cmds != null) {
          for (DatanodeCommand cmd : cmds) {
            try {
            	//在命令组中,只要有一条命令执行出错,整个执行过程就算失败
              if (processCommand(cmd) == false) {
                return false;
              }
            } catch (IOException ioe) {
              LOG.warn("Error processing datanode Command", ioe);
            }
          }
        }
        return true;
      }
    然后是单条命令执行,这才是我们想看到的

    /**
         * 
         * @param cmd
         * @return true if further processing may be required or false otherwise. 
         * @throws IOException
         * 调用单条命令处理方法
         */
      private boolean processCommand(DatanodeCommand cmd) throws IOException {
        if (cmd == null)
          return true;
        final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
        
        //取出命令的action值类型,进行分别判断处理
        switch(cmd.getAction()) {
        case DatanodeProtocol.DNA_TRANSFER:
          // Send a copy of a block to another datanode
          ....
          break;
        case DatanodeProtocol.DNA_INVALIDATE:
          //如果是无效块,则进行blockScanner类扫描删除操作
          //
          // Some local block(s) are obsolete and can be 
          // safely garbage-collected.
          //
          Block toDelete[] = bcmd.getBlocks();
          try {
            if (blockScanner != null) {
              blockScanner.deleteBlocks(toDelete);
            }
            data.invalidate(toDelete);
          } catch(IOException e) {
            checkDiskError();
            throw e;
          }
          myMetrics.incrBlocksRemoved(toDelete.length);
          break;
        case DatanodeProtocol.DNA_SHUTDOWN:
          ....
        case DatanodeProtocol.DNA_REGISTER:
          //如果是注册命令,则调用注册操作
          .....
          break;
        case DatanodeProtocol.DNA_FINALIZE:
          storage.finalizeUpgrade();
          break;
        case UpgradeCommand.UC_ACTION_START_UPGRADE:
          // start distributed upgrade here
          processDistributedUpgradeCommand((UpgradeCommand)cmd);
          break;
        case DatanodeProtocol.DNA_RECOVERBLOCK:
          recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
          break;
        case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
          LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
          if (isBlockTokenEnabled) {
            blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
          }
          break;
        case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
          .....
        default:
          LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
        }
        return true;
      }

    对每种命令,取出action值类别,进行分别处理。OK,到这里就处理完了前半部分代码所做的事情了,在后面会进行数据块上报的工作,准确的说,这方面的操作就不算是心跳机制里面的过程了,但是都是在一个大循环中进行的。

    //检测新接收到的block
            // check if there are newly received blocks
            Block [] blockArray=null;
            String [] delHintArray=null;
            synchronized(receivedBlockList) {
              synchronized(delHints) {
                int numBlocks = receivedBlockList.size();
                if (numBlocks > 0) {
                  if(numBlocks!=delHints.size()) {
                    LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
                  }
                  //
                  // Send newly-received blockids to namenode
                  //
                  blockArray = receivedBlockList.toArray(new Block[numBlocks]);
                  delHintArray = delHints.toArray(new String[numBlocks]);
                }
              }
            }
            if (blockArray != null) {
              if(delHintArray == null || delHintArray.length != blockArray.length ) {
                LOG.warn("Panic: block array & delHintArray are not the same" );
              }
              //将接收到的新block信息上报
              namenode.blockReceived(dnRegistration, blockArray, delHintArray);
              synchronized (receivedBlockList) {
                synchronized (delHints) {
                  for(int i=0; i<blockArray.length; i++) {
                    receivedBlockList.remove(blockArray[i]);
                    delHints.remove(delHintArray[i]);
                  }
                }
              }
            }
    数据节点进行心跳信息上传,名字节点如何判断节点是否长期没有上报心跳呢,在这之间必然会有监控线程,在命名系统类中,相关的变量

    /**
       * Stores a set of DatanodeDescriptor objects.
       * This is a subset of {@link #datanodeMap}, containing nodes that are 
       * considered alive.
       * The {@link HeartbeatMonitor} periodically checks for outdated entries,
       * and removes them from the list.
       * 是datanodeMap的子集,保存了节点中存活的节点集合,在HeartbeatMonitor中会被使用
       */
      ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
    此变量就代表着现有存活的节点列表
    /**
       * Periodically calls heartbeatCheck() and updateAccessKey()
       * 心跳监控线程
       */
      class HeartbeatMonitor implements Runnable {
      	//上次心跳的检测时间
        private long lastHeartbeatCheck;
        private long lastAccessKeyUpdate;
        /**
         */
        public void run() {
          while (fsRunning) {
            try {
              long now = now();
              if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
              	//如果在间隔时间内,做心跳检测
                heartbeatCheck();
                lastHeartbeatCheck = now;
              }
              if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
                updateAccessKey();
                lastAccessKeyUpdate = now;
              }
            } catch (Exception e) {
              FSNamesystem.LOG.err
    上面是监控线程的执行方法,在做心跳检测的方法时,作者采用了一种很有远见的想法,采用了故障寻找,故障分离的办法,原因是考虑到分布式系统的负载情况,以及对于系统整体性能的影响,下面是故障寻找

    /**
       * Check if there are any expired heartbeats, and if so,
       * whether any blocks have to be re-replicated.
       * While removing dead datanodes, make sure that only one datanode is marked
       * dead at a time within the synchronized section. Otherwise, a cascading
       * effect causes more datanodes to be declared dead.
       * 检测是否有任何过期的心跳,移除dead状态的节点
       */
      void heartbeatCheck() {
      	//安全模式下不做任何的检查
        if (isInSafeMode()) {
          // not to check dead nodes if in safemode
          return;
        }
        boolean allAlive = false;
        while (!allAlive) {
          boolean foundDead = false;
          DatanodeID nodeID = null;
    
          // locate the first dead node.
          //定位寻找第一个dead故障状态的节点
          synchronized(heartbeats) {
            for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
                 it.hasNext();) {
              DatanodeDescriptor nodeInfo = it.next();
              if (isDatanodeDead(nodeInfo)) {
              	//一旦找到,并取出对应消息并跳出循环
                foundDead = true;
                nodeID = nodeInfo;
                break;
              }
            }
          }
    
    然后是处理,但是处理之前还需要在判断一次是否是故障节点,分布式的环境嘛,各种因素影响。

    // acquire the fsnamesystem lock, and then remove the dead node.
          if (foundDead) {
          	//为了确保同步性,进行加锁操作,进行dead故障状态节点的处理操作
            synchronized (this) {
              synchronized(heartbeats) {
                synchronized (datanodeMap) {
                  DatanodeDescriptor nodeInfo = null;
                  try {
                    nodeInfo = getDatanode(nodeID);
                  } catch (IOException e) {
                    nodeInfo = null;
                  }
                  if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
                    NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
                                                 + "lost heartbeat from " + nodeInfo.getName());
                    //移除此节点的信息,此方法会调用headtbeat的移除nodeinfo操作
                    removeDatanode(nodeInfo);
                  }
                }
              }
            }
          }
          //重置标志位
          allAlive = !foundDead;
        }
    心跳故障处理检测完毕,这种故障处理检测的思想或许对于我们日后自己动手设计大型分布式系统可能会有所启发。

    全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。


    参考文献

    《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等




  • 相关阅读:
    sql server 中各个系统表的作用==== (转载)
    后台动态设置前台标签内容和属性
    利用C#编写一个简单的抓网页应用程序
    如何创建和使用Web Service代理类
    jdbc如何取得存储过程return返回值
    子窗口和父窗口的函数或对象能否相互访问 (转载)
    把aspx文件编译成DLL文件
    C#中的类型转换
    c#中对文件的操作小结
    转贴一篇 自定义数据库 希望对你有帮助
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183878.html
Copyright © 2011-2022 走看看