zoukankan      html  css  js  c++  java
  • TextFile分区问题

    val rdd = sc.parallelize(List(1,2,3,4,5,6),第二参数)
    这里的第二参数 获取方式有两种:
    1.直接给定值,根据传入的值决定分区的数量
    2.根据运行环境获取分区数量(core) -->例如 本地运行 设置为local 此时设置分区值默认分区就是1个

    val rdd = sc.textFile(path: String, minPartitions: Int = defaultMinPartitions)
    读取文件中内容算子中有两个参数 第一个参数是获取数据路径
    这个理第二个参数,第二参数决定了分区的数量有两种情况
    1.在不传递值的情况,使用是默认defaultMinPartitions --> 这个值时多少?
    2.在传递分区数量是时候,这个分区值是多少

    第一条主线 --> defaultMinParititons值时多少?
    1.先从textFile这个算子入手,进入后台源码
    def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
    }
    2.在不传入分区数值的情况下,默认textFile中使用了一个值defaultMinPartitions,这个值就决定了分区数量,查看这个值
    def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
    发现defaultMinPartitions,并不是一个值而是一个方法,在这个方法中实现是一个math比较最小值
    这个比较中有一共值时固定是 2 这个值,和2比较时有一个全新的参数defaultParallelism,需要查看这个参数

    3.继续拆安defaultParallelism这个值的时候发现他也是一个方法
    def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
    }
    方法最后一句是整个方法的返回一直,也就是说这个方法获取值,是最后一句产生,这个产生值还触发了一个TaskScheduler(任务调度),此时defaultParallelism
    当查看这个方法的时候发现这个方法并没有实现体,这个方法是存在在特质中 def defaultParallelism(): Int
    ctrl+alt+ 左右 回到之前调用或下一次调用(必须知道实现者是谁)
    在触发抽方法的位置 --> ctrl+atl+鼠标左键-->就可以查看实现这个方法或触发这个方法的类
    发现这个抽象方法实现类 --> TaskSchedulerImpl在这个类中有方法的实现

    4.TaskSchedulerImpl在这个类中有方法的实现
    override def defaultParallelism(): Int = backend.defaultParallelism()
    发现原来抽象方法已经被重写了,并且有一个实现,此时只需要触发defaultParallelism就可以触发出这个值多少了
    但是,点击查看后发现也是一个抽象方法defaultParallelism() --> 对这个实现在此查询实现者即可

    5.查看defaultParallelism()
    此时发现实现方式有两种
    1.CoarseGrainedSchedulerBackend spark集群模式
    2.LocalSchedulerBackend 本地模式
    我们查看是集群模式,结果发现了
    override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

    }
    这个方法中比较的是最大值,其中第一个采纳数是totalCoreCount即集群核心数 第二个参数固是2

    结论: 在调用textFile算子的时候,初始默认分区数量是2,除非小于2,否则默认分区数量就是2个

    第二条主线 --> 查看分区计算流程
    问题:先阶段已经知道分区数量默认是2个分区,具体分区中计算方式时候什么样式?(分片逻辑)

    1.还是在textFile这个算子实现中
    已经知道分区数量之后,查看内部对分区数量的使用,需要查看方法的实现
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
    分区参数传入到一个叫做 hadoopFile中,所以此时就需要查看hadoopFile是谁

    2.查看hadoopFile
    def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)


    //核心出现在这个位置,这里创建了一个HadoopRDD对象
    new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
    }

    3.查看HadoopRDD中存在哪些操作?
    在这里类中
    override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
    array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
    }
    getPartitions是切片方法的触发
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 这个方法是具体的切分
    val array = new Array[Partition](inputSplits.size) 就是获取分片个数
    需要查看getSplits
    4.查看 getSplits方法
    这个方法是接口中抽象方法,此时需要使用 ctrl+atl+鼠标左边 查看这个方法的实现
    一般处理数据方式都是 FileInputFormat类中查看 getSplits方法
    这个方法和MR中切片放啊其实逻辑是一样的,核心位置
    long totalSize = 0; // compute total size
    for (FileStatus file: files) { // check we have valid files
    if (file.isDirectory()) {
    throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
    }
    // totalSize 获取文件的大小

    Spark中和MR中切片最大的不同位置出现了,Spark会计算切片大小
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    启动numSplits就是之前minPartitions即 默认分区值

    最终切片的位置依旧保留着MR中思想即 1.1冗余
    long splitSize = computeSplitSize(goalSize, minSize, blockSize); //这里会计算真正切片的大小

    long bytesRemaining = length; //文件大小
    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //切片逻辑
    //切完一片之后 会减去切片大小
    bytesRemaining -= splitSize;

    总结:分区数量其实是可以影响最最终文件的个数,但是在最终输出界过之前,会执行分片处理,这个分片才是最终输出分区的个数,我们若需要影响最终输出值,此时可以在最终输出算子之前调用 repartition 来修改分区

     

  • 相关阅读:
    ----localStorage的同步与异步----
    ----vue2.0实现别人通过ip访问自己运行的项目----
    ----vue之搜索框与防抖函数的封装----
    ----vue项目打包之浏览器存在缓存问题----
    ----vue组件name的作用小结----
    ----HTML5本地储存--利用storage事件实时监听Web Storage----
    ---- vue之filter ----
    ----vue项目配置环境----
    ----git-ssh 配置和使用----
    python『学习之路03』装饰器
  • 原文地址:https://www.cnblogs.com/yumengfei/p/12030648.html
Copyright © 2011-2022 走看看