zoukankan      html  css  js  c++  java
  • spark 解决大文件造成的分区数据量过大的问题

    背景

    在使用spark处理文件时,经常会遇到要处理的文件大小差别的很大的情况。如果不加以处理的话,特别大的文件就可能产出特别大的spark 分区,造成分区数据倾斜,严重影响处理效率。

    解决方案

    Spark RDD

    spark在读取文件构建RDD的时候(调用spark.SparkContext.TextFile(FILENAME, [minPartition]), spark.SparkContext.SequenceFile(FILENAME) ,因为这两个都实现了FileInputFormat),每个RDD分区的大小是由下面的几个参数控制的。

    spark.hadoop.mapreduce.input.fileinputformat.split.minsize #(单位字节,默认值:0)
    dfs.blocksize #(单位字节, 默认值: 128M,  在hdfs-site.xml中配置,这个会影响到hadoop,非常不建议修改)
    minPartition #(最小分区数,默认值2)
    

    spark在调用这两个方法读取文件为RDD的时候,会经历如下步骤

    1. 计算要读取的所有文件的总大小 TOTAL_SIZE
    2. 计算平均每个文件的大小 AVERAGE_SIZE = TOTAL_SIZE/minPartition
    3. 获取文件所在HDFS上的BLOCK_SIZE (即:dfs.blocksize)
    4. 读取spark.hadoop.mapreduce.input.fileinputformat.split.minsize,获取文件的最小值 MIN_SIZE
    5. 计算要产出RDD的分区大小 PARTITION_SIZE = max(MIN_SIZE, min(AVERAGE_SIZE, BLOCK_SIZE))

    由上可知,如果调用TextFile时不设置minPartition,且不设置split.minsize,那么产出的RDD每个分区最大大小为 BLOCK_SIZE。

    如果希望产出分区的大小小于BLOCK_SIZE,就需要设置minPartition为非常大一个值,使得AVERAGE_SIZE变小,然后通过split.minsize来控制产出的分区大小。

    备注:这两种方法对于大文件可以切分成小文件,但是对于输入的小文件,即使小于split.minsize也不会合并。不过相比大文件,小文件对spark性能没有太大影响。

    参考资料

    1. org.apache.hadoop.mapred.FileInputFormat.java
    2. SparkContext.scala
    Spark SQL

    适用于使用spark.sql读取文件/hive的场景

    spark.sql.files.maxPartitionBytes  #单位字节  默认128M   每个分区最大的文件大小,针对于大文件切分
    spark.sql.files.openCostInBytes   #单位字节  默认值4M   小于该值的文件将会被合并,针对于小文件合并
    
  • 相关阅读:
    STM32|4-20mA输出电路
    Delphi版IP地址与整型互转
    侧方位停车
    98年的‘风暴’,08年的‘危机’,18年的“钱荒‘’
    一些软件设计的原则
    oracle-数据库的各种-锁-详解
    演员李艾佳去世突发病征年仅36岁
    【人生】王石:没变强是因为你太舒服
    耐心看的人早晚会成人上人
    Linux之make的用法讲解
  • 原文地址:https://www.cnblogs.com/sxhlinux/p/13277206.html
Copyright © 2011-2022 走看看