zoukankan      html  css  js  c++  java
  • 解读:CombineFileInputFormat类

    MR-Job默认的输入格式FileInputFormat为每一个小文件生成一个切片。CombineFileInputFormat通过将多个“小文件”合并为一个"切片"(在形成切片的过程中也考虑同一节点、同一机架的数据本地性),让每一个Mapper任务可以处理更多的数据,从而提高MR任务的执行速度。详见 MR案例:CombineFileInputFormat

    1).三个重要的属性:

    • maxSplitSize:切片大小最大值。可通过属性 "mapreduce.input.fileinputformat.split.maxsize" 或 CombineFileInputFormat.setMaxInputSplitSize()方法进行设置【不设置,则所有输入只启动一个map任务
    • minSplitSizeNode:同一节点的数据块形成切片时,切片大小的最小值。可通过属性 "mapreduce.input.fileinputformat.split.minsize.per.node" 或 CombineFileInputFormat.setMinSplitSizeNode()方法进行设置
    • minSplitSizeRack:同一机架的数据块形成切片时,切片大小的最小值。可通过属性 "mapreduce.input.fileinputformat.split.minsize.per.rack" 或 CombineFileInputFormat.setMinSplitSizeRack()方法进行设置
    • 大小关系:maxSplitSize > minSplitSizeNode > minSplitSizeRack

    2).切片的形成过程:

     2.1. 不断迭代节点列表,逐个节点 (以数据块为单位) 形成切片(Local Split)

      a. 如果maxSplitSize == 0,则整个节点上的Block数据形成一个切片

      b. 如果maxSplitSize != 0,遍历并累加每个节点上的数据块,如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < maxSplitSize 。则进行下一步

      c. 如果剩余数据块累加大小 >= minSplitSizeNode,则将这些剩余数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < minSplitSizeNode。然后进行下一步,并这些数据块留待后续处理
     
     2.2. 不断迭代机架列表,逐个机架 (以数据块为单位) 形成切片(Rack Split)

      a. 遍历并累加这个机架上所有节点的数据块 (这些数据块即上一步遗留下来的数据块),如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小<maxSplitSize。则进行下一步

      b. 如果剩余数据块累加大小 >= minSplitSizeRack,则将这些剩余数据块形成一个切片。如果剩余数据块累加大小 < minSplitSizeRack,则这些数据块留待后续处理     

     2.3. 遍历并累加所有Rack上的剩余数据块,如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小< maxSplitSize。则进行下一步
     
     2.4. 将最终剩余的数据块形成一个切片。
    Demo:
    规定maxSplit=100 > minSizeNode=50 > minSizeRack=30
    原有文件Rack01:{[30,60,70] [80,110]}   Rack02:{170}  
    处理过程
    30+60+70 > 100 ? 100+60  80+110 > 100 ? 100+90  170 > 100 ? 100+70  
      --->  3个数据切片,以及Rack01:{[60] [90]}  Rack02:{70}  
        --->  60 > 50 ? 50+10  90 > 50 ? 50+40  70 > 50 ? 50+20  
          --->  3+3个数据切片,以及Rack01:{[10] [40]}  Rack02:{20}  
            --->  10+40 < 100 ?0  20 < 100 ? 0  
              --->  3+3+0个数据切片,以及Rack01:{50}  Rack02:{20}  
                --->  50+20 > 30 ? 30+30+10  
                  --->  3+3+0+3个数据切片
      

    3).源码:getSplit()

      @Override
      public List<InputSplit> getSplits(JobContext job) 
        throws IOException {
        long minSizeNode = 0;
        long minSizeRack = 0;
        long maxSize = 0;
        Configuration conf = job.getConfiguration();
    
        // 通过setxxxSplitSize()方法设置的参数值会覆盖掉从配置文件中读取的参数值
        if (minSplitSizeNode != 0) {
          minSizeNode = minSplitSizeNode;
        } else {
          minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
        }
        if (minSplitSizeRack != 0) {
          minSizeRack = minSplitSizeRack;
        } else {
          minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
        }
        if (maxSplitSize != 0) {
          maxSize = maxSplitSize;
        } else {

    //如果maxSize没有配置,整个Node生成一个Split
    maxSize
    = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
    } if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize); } if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { throw new IOException("Minimum split size per rack " + minSizeRack + " cannot be larger than maximum split size " + maxSize); } if (minSizeRack != 0 && minSizeNode > minSizeRack) { throw new IOException("Minimum split size per node " + minSizeNode + " cannot be larger than minimum split " + "size per rack " + minSizeRack); } //获取输入路径中的所有文件 List<FileStatus> stats = listStatus(job); List<InputSplit> splits = new ArrayList<InputSplit>(); if (stats.size() == 0) { return splits; } // 迭代为每个过滤池中的文件生成切片 //一个切片中的数据块只可能来自于同一个过滤池,但可以来自同一个过滤池中的不同文件 for (MultiPathFilter onepool : pools) { ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
    //获取满足当前过滤池实例onepool的所有文件myPaths for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) { FileStatus p = iter.next(); if (onepool.accept(p.getPath())) { myPaths.add(p); // add it to my output set iter.remove(); } } //为mypaths中的文件生成切片 getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } //为不属于任何过滤池的文件生成切片 getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); //free up rackToNodes map rackToNodes.clear(); return splits; }

    4).源码:getMoreSplits()

    无论是满足某过滤池实例 onePool 条件的文件,还是不属于任何过滤池的文件,可以笼统地理解为 "一批文件",getMoreSplits()就是为这一批文件生成切片的。 

    /**
       * Return all the splits in the specified set of paths
       */
      private void getMoreSplits(JobContext job, List<FileStatus> stats,
                                 long maxSize, long minSizeNode, long minSizeRack,
                                 List<InputSplit> splits)
        throws IOException {
        Configuration conf = job.getConfiguration();
    
        //OneFileInfo类:代表一个文件 
        OneFileInfo[] files;
    
    //rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块; HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>(); //blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点 HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); //nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块; HashMap<String, Set<OneBlockInfo>> nodeToBlocks = new HashMap<String, Set<OneBlockInfo>>(); files = new OneFileInfo[stats.size()]; if (stats.size() == 0) { return; } /** * 迭代这"一批文件",为每一个文件构建OneFileInfo对象 * OneFileInfo对象在构建过程中维护了上述三个对应关系的信息。 * 迭代完成之后,即可以认为数据块、节点、机架相互之间的对应关系已经建立完毕 * 接下来可以根据这些信息生成切片 */ long totLength = 0; int i = 0; for (FileStatus stat : stats) { files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); totLength += files[i].getLength(); } //切片的形成过程 createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits); }

    5).源码:createSplits()

      @VisibleForTesting
      void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
                         Map<OneBlockInfo, String[]> blockToNodes,
                         Map<String, List<OneBlockInfo>> rackToBlocks,
                         long totLength,
                         long maxSize,
                         long minSizeNode,
                         long minSizeRack,
                         List<InputSplit> splits                     
                        ) {
    
        //保存当前切片所包含的数据块
        ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
    
        //保存当前切片的大小
        long curSplitSize = 0;
        
        int totalNodes = nodeToBlocks.size();
        long totalLength = totLength;
    
        Multiset<String> splitsPerNode = HashMultiset.create();
        Set<String> completedNodes = new HashSet<String>();
        
        while(true) {
          // it is allowed for maxSize to be 0. Disable smoothing load for such cases
    
          //逐个节点(数据块)形成切片
          // process all nodes and create splits that are local to a node. Generate
          // one split per node iteration, and walk over nodes multiple times to
          // distribute the splits across nodes. 
          for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
              .entrySet().iterator(); iter.hasNext();) {
            Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
            
            String node = one.getKey();
            
            // Skip the node if it has previously been marked as completed.
            if (completedNodes.contains(node)) {
              continue;
            }
    
            Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
    
            // for each block, copy it into validBlocks. Delete it from
            // blockToNodes so that the same block does not appear in
            // two different splits.
            Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
            while (oneBlockIter.hasNext()) {
              OneBlockInfo oneblock = oneBlockIter.next();
              
              // Remove all blocks which may already have been assigned to other
              // splits.
              if(!blockToNodes.containsKey(oneblock)) {
                oneBlockIter.remove();
                continue;
              }
            
              validBlocks.add(oneblock);
              blockToNodes.remove(oneblock);
              curSplitSize += oneblock.length;
    
              // if the accumulated split size exceeds the maximum, then
              // create this split.
    
              //如果数据块累积大小大于或等于maxSize,则形成一个切片
              if (maxSize != 0 && curSplitSize >= maxSize) {
                // create an input split and add it to the splits array
                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                totalLength -= curSplitSize;
                curSplitSize = 0;
    
                splitsPerNode.add(node);
    
                // Remove entries from blocksInNode so that we don't walk these
                // again.
                blocksInCurrentNode.removeAll(validBlocks);
                validBlocks.clear();
    
                // Done creating a single split for this node. Move on to the next
                // node so that splits are distributed across nodes.
                break;
              }
    
            }
            if (validBlocks.size() != 0) {
              // This implies that the last few blocks (or all in case maxSize=0)
              // were not part of a split. The node is complete.
              
              // if there were any blocks left over and their combined size is
              // larger than minSplitNode, then combine them into one split.
              // Otherwise add them back to the unprocessed pool. It is likely
              // that they will be combined with other blocks from the
              // same rack later on.
              // This condition also kicks in when max split size is not set. All
              // blocks on a node will be grouped together into a single split.
    
              // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
           // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理
    
              if (minSizeNode != 0 && curSplitSize >= minSizeNode
                  && splitsPerNode.count(node) == 0) {
                // haven't created any split on this machine. so its ok to add a
                // smaller one for parallelism. Otherwise group it in the rack for
                // balanced size create an input split and add it to the splits
                // array
                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                totalLength -= curSplitSize;
                splitsPerNode.add(node);
                // Remove entries from blocksInNode so that we don't walk this again.
                blocksInCurrentNode.removeAll(validBlocks);
                // The node is done. This was the last set of blocks for this node.
              } else {
                // Put the unplaced blocks back into the pool for later rack-allocation.
                for (OneBlockInfo oneblock : validBlocks) {
                  blockToNodes.put(oneblock, oneblock.hosts);
                }
              }
              validBlocks.clear();
              curSplitSize = 0;
              completedNodes.add(node);
            } else { // No in-flight blocks.
              if (blocksInCurrentNode.size() == 0) {
                // Node is done. All blocks were fit into node-local splits.
                completedNodes.add(node);
              } // else Run through the node again.
            }
          }
    
          // Check if node-local assignments are complete.
          if (completedNodes.size() == totalNodes || totalLength == 0) {
            // All nodes have been walked over and marked as completed or all blocks
            // have been assigned. The rest should be handled via rackLock assignment.
            LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
                + completedNodes.size() + ", size left: " + totalLength);
            break;
          }
        }
        //逐个机架(数据块)形成切片
        // if blocks in a rack are below the specified minimum size, then keep them
        // in 'overflow'. After the processing of all racks is complete, these 
        // overflow blocks will be combined into splits.
        //overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
        ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
        Set<String> racks = new HashSet<String>();
    
        // Process all racks over and over again until there is no more work to do.
        while (blockToNodes.size() > 0) {
    
          // Create one split for this rack before moving over to the next rack. 
          // Come back to this rack after creating a single split for each of the 
          // remaining racks.
          // Process one rack location at a time, Combine all possible blocks that
          // reside on this rack as one split. (constrained by minimum and maximum
          // split size).
    
          //依次处理每个机架 
          for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
               rackToBlocks.entrySet().iterator(); iter.hasNext();) {
    
            Map.Entry<String, List<OneBlockInfo>> one = iter.next();
            racks.add(one.getKey());
            List<OneBlockInfo> blocks = one.getValue();
    
            // for each block, copy it into validBlocks. Delete it from 
            // blockToNodes so that the same block does not appear in 
            // two different splits.
            boolean createdSplit = false;
    
            //依次处理该机架的每个数据块
            for (OneBlockInfo oneblock : blocks) {
              if (blockToNodes.containsKey(oneblock)) {
                validBlocks.add(oneblock);
                blockToNodes.remove(oneblock);
                curSplitSize += oneblock.length;
          
                // if the accumulated split size exceeds the maximum, then 
                // create this split.如果数据块累积大小大于或等于maxSize,则形成一个切片
                if (maxSize != 0 && curSplitSize >= maxSize) {
                  // create an input split and add it to the splits array
                  addCreatedSplit(splits, getHosts(racks), validBlocks);
                  createdSplit = true;
                  break;
                }
              }
            }
    
            // if we created a split, then just go to the next rack
            if (createdSplit) {
              curSplitSize = 0;
              validBlocks.clear();
              racks.clear();
              continue;
            }
    
            if (!validBlocks.isEmpty()) {
    
              //如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片
              if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
                // if there is a minimum size specified, then create a single split
                // otherwise, store these blocks into overflow data structure
                addCreatedSplit(splits, getHosts(racks), validBlocks);
              } else {
                // There were a few blocks in this rack that 
                // remained to be processed. Keep them in 'overflow' block list. 
                // These will be combined later.
      
                //如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks
                overflowBlocks.addAll(validBlocks);
              }
            }
            curSplitSize = 0;
            validBlocks.clear();
            racks.clear();
          }
        }
    
        assert blockToNodes.isEmpty();
        assert curSplitSize == 0;
        assert validBlocks.isEmpty();
        assert racks.isEmpty();
    
        //遍历并累加剩余数据块
        for (OneBlockInfo oneblock : overflowBlocks) {
          validBlocks.add(oneblock);
          curSplitSize += oneblock.length;
    
          // This might cause an exiting rack location to be re-added,
          // but it should be ok.
          for (int i = 0; i < oneblock.racks.length; i++) {
            racks.add(oneblock.racks[i]);
          }
    
          // if the accumulated split size exceeds the maximum, then 
          // create this split.
          // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, getHosts(racks), validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
            racks.clear();
          }
        }
    
        //剩余数据块形成一个切片
        if (!validBlocks.isEmpty()) {
          addCreatedSplit(splits, getHosts(racks), validBlocks);
        }
      }
  • 相关阅读:
    Linkerd 2.10(Step by Step)—将 GitOps 与 Linkerd 和 Argo CD 结合使用
    Linkerd 2.10(Step by Step)—多集群通信
    Linkerd 2.10(Step by Step)—使用 Kustomize 自定义 Linkerd 的配置
    Linkerd 2.10(Step by Step)—控制平面调试端点
    Linkerd 2.10(Step by Step)—配置超时
    Linkerd 2.10(Step by Step)—配置重试
    Linkerd 2.10(Step by Step)—配置代理并发
    本地正常运行,线上环境诡异异常原因集合
    Need to invoke method 'xxx' declared on target class 'yyy', but not found in any interface(s) of the exposed proxy type
    alpine 安装常用命令
  • 原文地址:https://www.cnblogs.com/skyl/p/4754999.html
Copyright © 2011-2022 走看看