zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Spark(7)spark读取文件split过程(即RDD分区数量)

    spark 2.1.1

    spark初始化rdd的时候,需要读取文件,通常是hdfs文件,在读文件的时候可以指定最小partition数量,这里只是建议的数量,实际可能比这个要大(比如文件特别多或者特别大时),也可能比这个要小(比如文件只有一个而且很小时),如果没有指定最小partition数量,初始化完成的rdd默认有多少个partition是怎样决定的呢?

    以SparkContext.textfile为例来看下代码:

    org.apache.spark.SparkContext

      /**
       * Read a text file from HDFS, a local file system (available on all nodes), or any
       * Hadoop-supported file system URI, and return it as an RDD of Strings.
       */
      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    
      /**
       * Default min number of partitions for Hadoop RDDs when not given by user
       * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
       * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
       */
      def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
    
      /** Get an RDD for a Hadoop file with an arbitrary InputFormat
       *
       * @note Because Hadoop's RecordReader class re-uses the same Writable object for each
       * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
       * operation will create many references to the same object.
       * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
       * copy them using a `map` function.
       */
      def hadoopFile[K, V](
          path: String,
          inputFormatClass: Class[_ <: InputFormat[K, V]],
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
        assertNotStopped()
    
        // This is a hack to enforce loading hdfs-site.xml.
        // See SPARK-11227 for details.
        FileSystem.getLocal(hadoopConfiguration)
    
        // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
        val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
        val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions).setName(path)
      }

    可见会直接返回一个HadoopRDD,如果不传最小partition数量,会使用defaultMinPartitions(通常情况下是2),那么HadoopRDD是怎样实现的?

    org.apache.spark.rdd.HadoopRDD

    class HadoopRDD[K, V](
        sc: SparkContext,
        broadcastedConf: Broadcast[SerializableConfiguration],
        initLocalJobConfFuncOpt: Option[JobConf => Unit],
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int)
      extends RDD[(K, V)](sc, Nil) with Logging {
    ...
      override def getPartitions: Array[Partition] = {
        val jobConf = getJobConf()
        // add the credentials here as this can be called before SparkContext initialized
        SparkHadoopUtil.get.addCredentials(jobConf)
        val inputFormat = getInputFormat(jobConf)
        val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
        val array = new Array[Partition](inputSplits.size)
        for (i <- 0 until inputSplits.size) {
          array(i) = new HadoopPartition(id, i, inputSplits(i))
        }
        array
      }
    ...
      protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
        val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
          .asInstanceOf[InputFormat[K, V]]
        newInputFormat match {
          case c: Configurable => c.setConf(conf)
          case _ =>
        }
        newInputFormat
      }

    决定分区数量的逻辑在getPartitions中,实际上调用的是InputFormat.getSplits,InputFormat是一个接口,

    org.apache.hadoop.mapred.InputFormat

    public interface InputFormat<K, V> {
        InputSplit[] getSplits(JobConf var1, int var2) throws IOException;
    
        RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException;
    }

    每种文件格式都有自己的实现类,常见的文件格式avro、orc、parquet、textfile对应的实现类为AvroInputFormat,OrcInputFormat,MapredParquetInputFormat,CombineTextInputFormat,每个实现类都有自己的split逻辑,来看下默认实现:

    org.apache.hadoop.mapred.FileInputFormat

      /** Splits files returned by {@link #listStatus(JobConf)} when
       * they're too big.*/ 
      public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
        FileStatus[] files = listStatus(job);
        
        // Save the number of input files for metrics/loadgen
        job.setLong(NUM_INPUT_FILES, files.length);
        long totalSize = 0;                           // compute total size
        for (FileStatus file: files) {                // check we have valid files
          if (file.isDirectory()) {
            throw new IOException("Not a file: "+ file.getPath());
          }
          totalSize += file.getLen();
        }
    
        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
        // generate splits
        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(fs, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                String[] splitHosts = getSplitHosts(blkLocations,
                    length-bytesRemaining, splitSize, clusterMap);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    splitHosts));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                String[] splitHosts = getSplitHosts(blkLocations, length
                    - bytesRemaining, bytesRemaining, clusterMap);
                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                    splitHosts));
              }
            } else {
              String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
              splits.add(makeSplit(path, 0, length, splitHosts));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        LOG.debug("Total # of splits: " + splits.size());
        return splits.toArray(new FileSplit[splits.size()]);
      }
      
      /** 
       * This function identifies and returns the hosts that contribute 
       * most for a given split. For calculating the contribution, rack
       * locality is treated on par with host locality, so hosts from racks
       * that contribute the most are preferred over hosts on racks that 
       * contribute less
       * @param blkLocations The list of block locations
       * @param offset 
       * @param splitSize 
       * @return array of hosts that contribute most to this split
       * @throws IOException
       */
      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
          long offset, long splitSize, NetworkTopology clusterMap)
      throws IOException {
    
        int startIndex = getBlockIndex(blkLocations, offset);
    
        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
                              blkLocations[startIndex].getLength() - offset;
    
        //If this is the only block, just return
        if (bytesInThisBlock >= splitSize) {
          return blkLocations[startIndex].getHosts();
        }
    
        long bytesInFirstBlock = bytesInThisBlock;
        int index = startIndex + 1;
        splitSize -= bytesInThisBlock;
    
        while (splitSize > 0) {
          bytesInThisBlock =
            Math.min(splitSize, blkLocations[index++].getLength());
          splitSize -= bytesInThisBlock;
        }
    
        long bytesInLastBlock = bytesInThisBlock;
        int endIndex = index - 1;
        
        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
        String [] allTopos = new String[0];
    
        // Build the hierarchy and aggregate the contribution of 
        // bytes at each level. See TestGetSplitHosts.java 
    
        for (index = startIndex; index <= endIndex; index++) {
    
          // Establish the bytes in this block
          if (index == startIndex) {
            bytesInThisBlock = bytesInFirstBlock;
          }
          else if (index == endIndex) {
            bytesInThisBlock = bytesInLastBlock;
          }
          else {
            bytesInThisBlock = blkLocations[index].getLength();
          }
          
          allTopos = blkLocations[index].getTopologyPaths();
    
          // If no topology information is available, just
          // prefix a fakeRack
          if (allTopos.length == 0) {
            allTopos = fakeRacks(blkLocations, index);
          }
    
          // NOTE: This code currently works only for one level of
          // hierarchy (rack/host). However, it is relatively easy
          // to extend this to support aggregation at different
          // levels 
          
          for (String topo: allTopos) {
    
            Node node, parentNode;
            NodeInfo nodeInfo, parentNodeInfo;
    
            node = clusterMap.getNode(topo);
    
            if (node == null) {
              node = new NodeBase(topo);
              clusterMap.add(node);
            }
            
            nodeInfo = hostsMap.get(node);
            
            if (nodeInfo == null) {
              nodeInfo = new NodeInfo(node);
              hostsMap.put(node,nodeInfo);
              parentNode = node.getParent();
              parentNodeInfo = racksMap.get(parentNode);
              if (parentNodeInfo == null) {
                parentNodeInfo = new NodeInfo(parentNode);
                racksMap.put(parentNode,parentNodeInfo);
              }
              parentNodeInfo.addLeaf(nodeInfo);
            }
            else {
              nodeInfo = hostsMap.get(node);
              parentNode = node.getParent();
              parentNodeInfo = racksMap.get(parentNode);
            }
    
            nodeInfo.addValue(index, bytesInThisBlock);
            parentNodeInfo.addValue(index, bytesInThisBlock);
    
          } // for all topos
        
        } // for all indices
    
        return identifyHosts(allTopos.length, racksMap);
      }

    大致过程如下:

    getSplits首先会拿到所有需要读取的file列表,然后会迭代这个file列表,首先看一个file是否可以再分即isSplitable(默认是true可能被子类覆盖),如果不能再split则直接作为1个split,如果可以再split,则获取这个file的block信息,然后综合根据多个参数来计算出1个split的数据大小即splitSize,然后会将这个file的所有block划分为多个split,划分过程会考虑机架、host等因素,如果是大block,则直接作为一个split,如果是小block可能多个block合并在一个split里(这样能够尽量减少split数量),最终得到的split数量即partition数量;


    注意:上边的过程可能被子类覆盖;

  • 相关阅读:
    python json 和 pickle的补充 hashlib configparser logging
    go 流程语句 if goto for swich
    go array slice map make new操作
    go 基础
    块级元素 行内元素 空元素
    咽炎就医用药(慢性肥厚性咽炎)
    春季感冒是风寒还是风热(转的文章)
    秋季感冒 咳嗽 怎么选药
    解决IE浏览器“无法显示此网页”的问题
    常用的 css 样式 记录
  • 原文地址:https://www.cnblogs.com/barneywill/p/10192800.html
Copyright © 2011-2022 走看看