zoukankan      html  css  js  c++  java
  • Spark RDD的默认分区数:(spark 2.1.0)

    本文基于Spark 2.1.0版本

    新手首先要明白几个配置:

    spark.default.parallelism:(默认的并发数)

        如果配置文件spark-default.conf中没有显示的配置,则按照如下规则取值:

        本地模式(不会启动executor,由SparkSubmit进程生成指定数量的线程数来并发):

        spark-shell                              spark.default.parallelism = 1

        spark-shell --master local[N] spark.default.parallelism = N (使用N个核)

        spark-shell --master local      spark.default.parallelism = 1

        伪集群模式(x为本机上启动的executor数,y为每个executor使用的core数,

    z为每个 executor使用的内存)

         spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

         mesos 细粒度模式

         Mesos fine grained mode  spark.default.parallelism = 8

        其他模式(这里主要指yarn模式,当然standalone也是如此)

        Others: total number of cores on all executor nodes or 2, whichever is larger

        spark.default.parallelism =  max(所有executor使用的core总数, 2)

    经过上面的规则,就能确定了spark.default.parallelism的默认值(前提是配置文件spark-default.conf中没有显示的配置,如果配置了,则spark.default.parallelism = 配置的值)

    还有一个配置比较重要,spark.files.maxPartitionBytes = 128 M(默认)

    The maximum number of bytes to pack into a single partition when reading files.

    代表着rdd的一个分区能存放数据的最大字节数,如果一个400m的文件,只分了两个区,则在action时会发生错误。

    当一个spark应用程序执行时,生成spark.context,同时会生成两个参数,由上面得到的spark.default.parallelism推导出这两个参数的值

    sc.defaultParallelism     = spark.default.parallelism

    sc.defaultMinPartitions = min(spark.default.parallelism,2)

    当sc.defaultParallelism和sc.defaultMinPartitions最终确认后,就可以推算rdd的分区数了。

    有两种产生rdd的方式:

    1,通过scala 集合方式parallelize生成rdd,

    如, val rdd = sc.parallelize(1 to 10)

    这种方式下,如果在parallelize操作时没有指定分区数,则

    rdd的分区数 = sc.defaultParallelism

    2,通过textFile方式生成的rdd,

    如, val rdd = sc.textFile(“path/file”)

    有两种情况:

    a,从本地文件file:///生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:

    (按照官网的描述,本地file的分片规则,应该按照hdfs的block大小划分,但实测的结果是固定按照32M来分片,可能是bug,不过不影响使用,因为spark能用所有hadoop接口支持的存储系统,所以spark textFile使用hadoop接口访问本地文件时和访问hdfs还是有区别的)

    rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)

    b,从hdfs分布式文件系统hdfs://生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:

    rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

    补充:

    1,如果使用如下方式,从HBase的数据表转换为RDD,则该RDD的分区数为该Table的region数。

    String tableName ="pic_test2";

    conf.set(TableInputFormat.INPUT_TABLE,tableName);

    conf.set(TableInputFormat.SCAN,convertScanToString(scan));

    JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,

    TableInputFormat.class,ImmutableBytesWritable.class,

    Result.class);

    Hbase Table:pic_test2的region为10,则hBaseRDD的分区数也为10。

    2,如果使用如下方式,通过获取json(或者parquet等等)文件转换为DataFrame,则该DataFrame的分区数和该文件在文件系统中存放的Block数量对应。

    Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

    people.json大小为300M,在HDFS中占用了2个blocks,则该DataFrame df分区数为2。

    3,Spark Streaming获取Kafka消息对应的分区数,不在本文讨论。



    作者:俺是亮哥
    链接:https://www.jianshu.com/p/4b7d07e754fa
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

  • 相关阅读:
    spring-session+Redis实现Session共享
    SQLServer语法常用总结
    [PDFBox]后台操作pdf的工具类
    类加载器
    SQLServer常用分页方式
    Tesseract识别图片提取文字&字库训练
    AbstractQueuedSynchronizer的简单介绍
    CountDownLatch 闭锁、FutureTask、Semaphore信号量、Barrier栅栏
    Java线程实现的第三种方式Callable方式与结合Future获取返回值
    原子类型的使用&Unsafe&CAS
  • 原文地址:https://www.cnblogs.com/qfdy123/p/13536428.html
Copyright © 2011-2022 走看看