zoukankan      html  css  js  c++  java
  • spark数据分区数量的原理

    原始RDD或数据集中的每一个分区都映射一个或多个数据文件, 该映射是在文件的一部分或者整个文件上完成的。

    Spark Job RDD/datasets在执行管道中,通过根据分区到数据文件的映射读取数据输入到RDD/dataset。

    如何根据某些参数确定spark的分区数?


    使用Dataset APIs读取数据的分区数:

    functions:

    https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

    *文件格式 APIs*
    Dataset<Row> = SparkSession.read.csv(...)
    Dataset<Row> = SparkSession.read.json(...)
    Dataset<Row> = SparkSession.read.text(...)
    Dataset<Row> = SparkSession.read.parquet(...)
    Dataset<Row> = SparkSession.read.orc(...)
    
    *通用格式 API*
    Dataset<Row> = SparkSession.read.format(String fileformat).load(...)

    影响数据分区数的参数:

    (a)spark.default.parallelism (default: Total No. of CPU cores)
    (b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【读取文件时打包到单个分区中的最大字节数。】
    (c)spark.sql.files.openCostInBytes (default: 4 MB)  【 该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并。】

    使用这些配置参数值,一个名为maxSplitBytes的最大分割准则被计算如下:

    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)

    bytesPerCore = (文件总大小 + 文件个数 * openCostInBytes)/ default.parallelism

    maxSplitBytes: 

    for each_file in files:
        if each_file is can split:
            if each_file.size() > maxSplitBytes:
                # file 被切分为 block_number 块其中block_number-1大小为 maxSplitBytes,1块<=maxSplitBytes
                block_number = ceil(each_file.size() / maxSplitBytes)
            else:
                block_number = 1
        else:
            #文件不可分
            block_number = 1

    数据文件计算文件块之后,将一个或多个文件块打包到一个分区中。

    打包过程从初始化一个空分区开始,然后对每个文件块进行迭代:

    1. 如果没有当前分区要打包,请初始化要打包的新分区,然后将迭代的文件块分配给该分区。 分区大小成为块大小与“ openCostInBytes”的额外开销的总和。

    2.如果块大小的增加不超过当前分区(正在打包)的大小超过' maxSplitBytes ',那么文件块将成为当前分区的一部分。分区大小是由块大小和“openCostInBytes”额外开销的总和增加的。

    3.如果块大小的增加超过了当前分区被打包的大小超过了' maxSplitBytes ',那么当前分区被声明为完整并启动一个新分区。迭代的文件块成为正在初始化的新分区的一部分,而新分区大小成为块大小和‘openCostInBytes’额外开销的总和。

    打包过程结束后,将获得用于读取相应数据文件的数据集的分区数。

    尽管获得分区数量的过程似乎有点复杂,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes边界处拆分单个文件。

    在此之后,将文件的分割块或不可分割的文件打包到一个分区中,这样,在将块打包到一个分区中期间,

    如果分区大小超过maxSplitBytes,则认为该分区已经打包完成,然后采用一个新分区进行打包。因此,最终从包装过程中得到一定数量的分区。

    e.g:

    core设置为10

    (a) 54 parquet files, 65 MB each, 默认参数 。  

    bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M
    
    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128
    
    65 < 128 && 2*65 > 128 ==> 54分区

    (b)54 parquet files, 63 MB each, 默认参数。

    bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M
    
    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128
    
    63 < 128 &&   4 + 2*63=126+4=130 > 128=maxPartitionBytes  ==> 54 (看起来 1分区可以容纳2个块,但是存在一个openCostInBytes开销4M,2个63+4大于了 128M,故一个分区只能一个块)

    (c)54 parquet files, 40 MB each, 默认参数。

    bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
    
    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128
    
    40 < 128 && (4+3*  40) = 124 < 128 (故一个分区可以装3个块) = 54/3 = 18分区

    (d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默认

    bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
    
    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88
    
    40 < 88 && (4+2*40) = 84 < 88 (一个分区2个) = 27个分区

    (e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400

    bytesPerCore = (54*40 + 54 * 4)/ 400 = 5.94M
    
    maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,5.94M)=5.94
    
    每个文件块数:ceil(40 / 5.94) = 7个   5.94 + 4M  > 5.94  一个分区一个块
    
    所以总分区数为:  54 * 7 = 378 个分区 

     使用RDD APIs读取数据文件的分区数

    RDD APIs类似下面的API

    *SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf)
    *SparkContext.textFile(String path, int minPartitions)
    *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass)
    *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions)
    *SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)

    在这些API中,会询问参数' minPartitions ',而在另一些API中则没有。如果没有查询,则默认值为2或1,1(默认情况下为1)。并行性是1。这个“minPartitions”是决定这些api返回的RDD中分区数量的因素之一。其他因素为Hadoop配置参数的值:

    # 关于 mapred.min.split.size see  https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

    # 关于 dfs.blocksize  see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 默认: 128M

    minSize (mapred.min.split.size - default value 1 MB) or  minSize (mapreduce.input.fileinputformat.split.minsize - default value 1 MB) 

    blockSize (dfs.blocksize - default 128 MB)

    goalSize = Sum of all files lengths to be read / minPartitions

    splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
     

    现在使用“splitSize”,

    for each_file in files:
        if each_file:
            if each_file.size() > splitSize:
                # file 被切分为 block_number 块其中block_number-1大小为 splitSize,最后一个块<=splitSize
                block_number = ceil(each_file.size() / maxSplitBytes)
            else:
                # 大小等于文件长度的文件块
                block_number = 1
                block_size  = 文件长度的文件块
        else:
            #文件不可分
            block_number = 1
            block_size = 文件长度的文件块

    每个文件块(大小大于0)都映射到单个分区。因此,由数据文件上的RDD api返回的RDD中的分区数,等于使用“splitSize”对数据文件进行切片而得到的非零文件块的数

    e.g:

    (a). 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, core is 10

    splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , 128M) = 128M 
    
    一个文件按照splitSize=128M可以分3个,故一共分区数 31*3=93

    (b). 54 parquet files, 40 MB each, blocksize at default 128 MB,  core is 10

    splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(2264924160/1,128M)) = 128M 
    
    splitSize=128M ,40 <128 1个文件长度的文件块 故为54个分区

    (c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000

    splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(31 * 330 * 1024 * 1024/1000 ,128 * 1024 * 1024)) = 10726932 = 10.23M
    
     一个文件分为 ceil(330/10.23) = 33块  共计:31 * 33 = 1023 共计分区: 1023个

    (d)  31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10

    splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 256 , min(31 * 330 * 1024 * 1024/1 ,128 * 1024 * 1024)) =256M
    
    330/256.0 = 2 , 31 * 2 = 62个分区

    总结:

         分区的最佳数量是高效可靠的Spark应用程序的关键。

  • 相关阅读:
    iris数据集
    codevs 1262 不要把球传我 2012年CCC加拿大高中生信息学奥赛
    codevs 1742 爬楼梯(水题日常)
    codevs 2277 爱吃皮蛋的小明(水题日常)
    洛谷 P3386 【模板】二分图匹配
    vijos 1190 繁忙的都市
    codevs 1487 大批整数排序(水题日常)
    洛谷 P2820 局域网
    codevs 1683 车厢重组(水题日常)
    codevs 1228 苹果树
  • 原文地址:https://www.cnblogs.com/similarface/p/13156874.html
Copyright © 2011-2022 走看看