zoukankan      html  css  js  c++  java
  • 详解 Spark 中的 Bucketing

    什么是 Bucketing

    Bucketing 就是利用 buckets(按列进行分桶)来决定数据分区(partition)的一种优化技术,它可以帮助在计算中避免数据交换(avoid data shuffle)。并行计算的时候shuffle常常会耗费非常多的时间和资源.

    Bucketing 的基本原理比较好理解,它会根据你指定的列(可以是一个也可以是多个)计算哈希值,然后具有相同哈希值的数据将会被分到相同的分区。

    bucket

    Bucket和Partition的区别

    Bucket的最终目的也是实现分区,但是和Partition的原理不同,当我们根据指定列进行Partition的时候,Spark会根据列的名字对数据进行分区(如果没有指定列名则会根据一个随机信息对数据进行分区)。Bucketing的最大不同在于它使用了指定列的哈希值,这样可以保证具有相同列值的数据被分到相同的分区。

    怎么用 Bucket

    按Bucket保存

    目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一起使用,如下。这个操作其实是将数据保存到了文件中(如果不指定path,也会保存到一个临时目录中)。

    df.write
      .bucketBy(10, "name")
      .sortBy("name")
      .mode(SaveMode.Overwrite)
      .option("path","/path/to")
      .saveAsTable("bucketed")
    

    数据分桶保存之后,我们才能使用它。

    直接从table读取

    在一个SparkSession内,保存之后你可以通过如下命令通过表名获取其对应的DataFrame.

    val df = spark.table("bucketed")
    

    其中spark是一个SparkSession对象。获取之后就可以使用DataFrame或者在SQL中使用表。

    从已经保存的Parquet文件读取

    如果你要使用历史保存的数据,那么就不能用上述方法了,也不能像读取常规文件一样使用 spark.read.parquet() ,这种方式读进来的数据是不带bucket信息的。正确的方法是利用CREATE TABLE 语句,详情可用参考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

    CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
      [(col_name1 col_type1 [COMMENT col_comment1], ...)]
      USING data_source
      [OPTIONS (key1=val1, key2=val2, ...)]
      [PARTITIONED BY (col_name1, col_name2, ...)]
      [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
      [LOCATION path]
      [COMMENT table_comment]
      [TBLPROPERTIES (key1=val1, key2=val2, ...)]
      [AS select_statement]
    

    示例如下:

    spark.sql(
      """
        |CREATE TABLE bucketed
        | (name string)
        |  USING PARQUET
        |  CLUSTERED BY (name) INTO 10 BUCKETS
        |  LOCATION '/path/to'
        |""".stripMargin)
    

    用Buckets的好处

    在我们join两个表的时候,如果两个表最好按照相同的列划分成相同的buckets,就可以完全避免shuffle。根据前面所述的hash值计算方法,两个表具有相同列值的数据会存放在相同的机器上,这样在进行join操作时就不需要再去和其他机器通讯,直接在本地完成计算即可。假设你有左右两个表,各有两个分区,那么join的时候实际计算就是下图的样子,两个机器进行计算,并且计算后分区还是2.

    with bucket

    而当需要shuffle的时候,会是这样的,
    without bucket

    细心的你可能发现了,上面两个分区对应两个Executor,下面shuffle之后对应的怎么成了三个Executor了?没错,当数据进行shuffle之后,分区数就不再保持和输入的数据相同了,实际上也没有必要保持相同。

    本地测试

    我们考虑的是大数据表的连接,本地测试的时候一般使用小的表,所以逆序需要将小表自动广播的配置关掉。如果开启小表广播,那么两个小表的join之后分区数是不会变的,例如:

    左表分区数 右表分区数数 Join之后的分区数
    3 3 3

    关闭配置的命令如下:

    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    

    正常情况下join之后分区数会发生变化:

    左表分区数 右表分区数数 Join之后的分区数
    3 3 200

    这个200其实就是 "spark.sql.shuffle.partitions" 配置的值,默认就是200. 所以如果在Join过程中出现了shuffle,join之后的分区一定会变,并且变成spark.sql.shuffle.partitions的值。通常你需要根据自己的集群资源修改这个值,从而优化并行度,但是shuffle是不可避免的。

    左右两个表Bucket数目不一致时

    实际测试结果如下:

    左表Bucket数 右表Bucekt数 Join之后的分区数
    8 4 8
    4 4 4

    Spark依然会利用一些Bucekt的信息,但具体怎么执行目前还不太清楚,还是保持一致的好。

    另外,如果你spark job的可用计算核心数小于Bucket值,那么从文件中读取之后Bucekt值会变,就是说bucket的数目不会超过你能使用的最大计算核数。

    不要使用的 <=> 符号!!!

    在处理null值的时候,我们可能会用到一些特殊的函数或者符号,如下表所示。但是在使用bucket的时候这里有个坑,一定要躲过。join的时候千万不要使用 <=> 符号,使用之后spark就会忽略bucket信息,继续shuffle数据,原因可能和hash计算有关。

    null

    原文连接

    如果你喜欢我的文章,可以在任一平台搜索【黑客悟理】关注我,非常感谢!

  • 相关阅读:
    04 类与对象
    03 方法
    02-JAVA语法基础
    大道至简第二章 读后感
    leetcode 65 有效数字
    leetcode 670 最大交换
    leetcode 49 字母异位词分组
    leetcode 71 简化路径
    leetcode 2 两数相加
    java compareTo 与 equals 区别
  • 原文地址:https://www.cnblogs.com/hackerphysics/p/12897616.html
Copyright © 2011-2022 走看看