zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之HDFS(2)HDFS副本数量检查及复制逻辑

    HDFS会周期性的检查是否有文件缺少副本,并触发副本复制逻辑使之达到配置的副本数,

      <property>

        <name>dfs.replication</name>

        <value>3</value>

      </property>

    具体实现是在BlockManager中启动线程ReplicationMonitor完成:

    org.apache.hadoop.hdfs.server.blockmanagement.BlockManager

      /**
       * Periodically calls computeReplicationWork().
       */
      private class ReplicationMonitor implements Runnable {
    
        @Override
        public void run() {
          while (namesystem.isRunning()) {
            try {
              // Process replication work only when active NN is out of safe mode.
              if (namesystem.isPopulatingReplQueues()) {
                computeDatanodeWork();
                processPendingReplications();
              }
              Thread.sleep(replicationRecheckInterval);
            } catch (Throwable t) {

    注释:sleep间隔replicationRecheckInterval取配置dfs.namenode.replication.interval,默认为3,即3s

      /**
       * Compute block replication and block invalidation work that can be scheduled
       * on data-nodes. The datanode will be informed of this work at the next
       * heartbeat.
       * 
       * @return number of blocks scheduled for replication or removal.
       */
      int computeDatanodeWork() {
        // Blocks should not be replicated or removed if in safe mode.
        // It's OK to check safe mode here w/o holding lock, in the worst
        // case extra replications will be scheduled, and these will get
        // fixed up later.
        if (namesystem.isInSafeMode()) {
          return 0;
        }
    
        final int numlive = heartbeatManager.getLiveDatanodeCount();
        final int blocksToProcess = numlive
            * this.blocksReplWorkMultiplier;
        final int nodesToProcess = (int) Math.ceil(numlive
            * this.blocksInvalidateWorkPct);
    
        int workFound = this.computeReplicationWork(blocksToProcess);

    注释:倍数blocksReplWorkMultiplier取配置dfs.namenode.replication.work.multiplier.per.iteration,默认为2,即每次处理datanode数量*2个block;

      /**
       * Scan blocks in {@link #neededReplications} and assign replication
       * work to data-nodes they belong to.
       *
       * The number of process blocks equals either twice the number of live
       * data-nodes or the number of under-replicated blocks whichever is less.
       *
       * @return number of blocks scheduled for replication during this iteration.
       */
      int computeReplicationWork(int blocksToProcess) {
        List<List<Block>> blocksToReplicate = null;
        namesystem.writeLock();
        try {
          // Choose the blocks to be replicated
          blocksToReplicate = neededReplications
              .chooseUnderReplicatedBlocks(blocksToProcess);
        } finally {
          namesystem.writeUnlock();
        }
        return computeReplicationWorkForBlocks(blocksToReplicate);
      }
    
      int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
    ...
              // Add block to the to be replicated list
              rw.srcNode.addBlockToBeReplicated(block, targets);
              scheduledWork++;

    注释:具体的处理过程是将待复制block添加到对应的原始datanode上;

    下面看DatanodeManager代码:

    org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager

      public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] reports, final String blockPoolId,
          long cacheCapacity, long cacheUsed, int xceiverCount, 
          int maxTransfers, int failedVolumes
          ) throws IOException {
    ...
            final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
            //check pending replication
            List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
                  maxTransfers);
            if (pendingList != null) {
              cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
                  pendingList));
            }

    注释:然后在DatanodeManager中处理心跳时将复制block信息发给对应的原始datanode;其中maxTransfer取值为

          final int maxTransfer = blockManager.getMaxReplicationStreams()
              - xmitsInProgress;

    getMaxReplicationStreams取配置dfs.namenode.replication.max-streams,默认是2,即一个datanode同时最多有2个block在复制;

  • 相关阅读:
    oracle ODM 数据挖掘笔记
    oracle 函数3
    清除Oralcle的session的理由和方法
    ORACLE 多表关联 UPDATE 语句
    PL/SQL中游标和游标变量的使用(转)
    Oracle select in/exists/not in/not exits
    实战 BIRT 系列,第 1 部分
    freebsd中查看文件夹/目录大小的方法/命令
    大数据量导出Excel的方案
    我的万能dao设计类图
  • 原文地址:https://www.cnblogs.com/barneywill/p/10114358.html
Copyright © 2011-2022 走看看