zoukankan      html  css  js  c++  java
  • Spark 创建RDD、DataFrame各种情况的默认分区数

    1、前置知识:

    (1)sc.defaultMinPartitions

      sc.defaultMinPartitions=min(sc.defaultParallelism,2)

      也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1

      上面的公式是在源码里定义的(均在类SparkContext里):

    def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
    def defaultParallelism: Int = {
        assertNotStopped()
        taskScheduler.defaultParallelism
      }

    (2)sc.defaultParallelism

    a、首先可通过spark.default.parallelism设置sc.defaultParallelism的值

    •  在文件中配置

    在文件spark-defaults.conf添加一行

    spark.default.parallelism=20

    验证:

    在spark-shell里输入sc.defaultParallelism,输出结果为20。

    • 在代码中配置
    val spark = SparkSession.builder()
      .appName("TestPartitionNums")
      .master("local")
      .config("spark.default.parallelism", 20)
      .getOrCreate()
    
    val sc = spark.sparkContext
    println(sc.defaultParallelism)
    
    spark.stop

    b、sc.defaultParallelism 没有配置时候,会有默认值

    • spark-shell:spark-shell里的值等于cpu的核数,比如我的windows的cpu的核数为再比如测试机的核数为8。
    • 指定master为local:在spark-shell里通过–master local和在代码里通过.master(“local”)的结果是一样的,这里以spark-shell为例当master为local时,值为1,当master为local[n]时,值为n
    • master为local[*]和不指定master一样,都为cpu核数
    • master为yarn模式时为分配的所有的Executor的cpu核数的总和或者2,两者取最大值,将2.1.2的代码的master注释掉并打包,然后用下面的脚本执行测试。

    test.sh

    spark-submit --num-executors $1 --executor-cores 1 --executor-memory 640M --master yarn   --class  com.dkl.leanring.spark.TestPartitionNums   spark-scala_2.11-1.0.jar

    2、HDFS文件创建RDD时的默认分区数:

      这里及后面讨论的是rdd和dataframe的分区,也就是读取hdfs文件并不会改变前面讲的sc.defaultParallelism和sc.defaultMinPartitions的值。

    (1)sc.textFile():  rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

    • 测试大文件(block的数量大于2):上传了一个1.52G的txt到hdfs上用来测试,其中每个block的大小为默认的128M,那么该文件有13个分区* 1.52*1024/128=12.16。

    用下面代码可以测试读取hdfs文件的分区数

    val rdd = sc.textFile("hdfs://ambari.master.com/data/egaosu/txt/20180416.txt")
    rdd.rdd.getNumPartitions

      这种方式无论是sc.defaultParallelism大于block数还是sc.defaultParallelism小于block数,rdd的默认分区数都为block数。

    * 注:之所以说是默认分区,因为textFile可以指定分区数,sc.textFile(path, minPartitions),通过第二个参数可以指定分区数

    sc.defaultMinPartitions大于block数

    sc.defaultMinPartitions小于block数

    当用参数指定分区数时,有两种情况,当参数大于block数时,则rdd的分区数为指定的参数值,否则分区数为block数。

    • 测试小文件(block数量等于1):默认分区数为sc.defaultMinPartitions,下面是对应的hdfs文件:

    将上面的hdfs路径改为:hdfs://ambari.master.com/tmp/dkl/data.txt,结果:

    当用参数指定分区数时,rdd的分区数大于等于参数值,本次测试为等于参数值或参数值+1。

    (2)读取hive表创建的DataFrame的分区数:

      hdfs文件的block的数目为10(2*5)。

    //切换数据库
    spark.sql("use route_analysis")
    //读取该数据库下的egaosu表为df
    val df = spark.table("egaosu")
    //打印df对应的rdd的分区数
    df.rdd.getNumPartitions

      测试发现,当sc.defaultParallelism大于block时,df的分区是等于sc.defaultParallelism,当小于block时,df的分区数介于sc.defaultParallelism和block之间,至于详细的分配策略,我还没有查到~

      用spark.sql(“select * from egaosu”)这种方式得到df和上面的分区数是一样的

    3、从代码里的内部数据集创建RDD时的默认分区数:

    (1)sc.parallelize()创建RDD:默认分区数等于sc.defaultParallelism,指定参数时分区数值等于参数值。

    (2)spark.createDataFrame(data)创建DataFrame:当data的长度小于sc.defaultParallelism,分区数等于data长度,否则分区数等于sc.defaultParallelism。

    参考博客:https://blog.csdn.net/dkl12/article/details/81663018

  • 相关阅读:
    spring事物配置,声明式事务管理和基于@Transactional注解的使用
    spring集成ehcache本地缓存
    Java并发编程:volatile关键字解析
    Callable接口、Runable接口、Future接口
    Sorting It All Out
    Borg Maze
    Agri-Net
    Highways
    Truck History
    Arbitrage
  • 原文地址:https://www.cnblogs.com/guoyu1/p/12300404.html
Copyright © 2011-2022 走看看