  • spark conf、config配置项总结

    1、structured-streaming的state 配置项总结

    Config Name Description Default Value
    spark.sql.streaming.stateStore.rocksdb.compactOnCommit Whether we perform a range compaction of RocksDB instance for commit operation False
    spark.sql.streaming.stateStore.rocksdb.blockSizeKB Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format. 4
    spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB The size capacity in MB for a cache of blocks.
    spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs The waiting time in millisecond for acquiring lock in the load operation for RocksDB instance. 60000
    spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad Whether we resets all ticker and histogram stats for RocksDB on load. True

    The class used to manage state data in stateful streaming queries. This class must
    be a subclass of StateStoreProvider, and must have a zero-arg constructor.
    Note: For structured streaming, this configuration cannot be changed between query
    restarts from the same checkpoint location.




    When true, Spark will validate the state schema against schema on existing state and
    fail query if it's incompatible.

    spark.sql.streaming.stateStore.minDeltasForSnapshot Minimum number of state store delta files that needs to be generated before they consolidated into snapshots. 10

    When true, check if the data from state store is valid or not when running streaming
    queries. This can happen if the state store format has been changed. Note, the feature
    is only effective in the build-in HDFS state store provider now.


    The interval in milliseconds between triggering maintenance tasks in StateStore.
    The maintenance task executes background maintenance task in all the loaded store
    providers if they are still the active instances according to the coordinator. If not,
    inactive instances of store providers will be closed.


    The codec used to compress delta and snapshot files generated by StateStore.
    By default, Spark provides four codecs: lz4, lzf, snappy, and zstd. You can also
    use fully qualified class names to specify the codec. Default codec is lz4.


    Set the RocksDB format version. This will be stored in the checkpoint when starting
    a streaming query. The checkpoint will use this RocksDB format version in the entire
    lifetime of the query.

    he minimum number of batches that must be retained and made recoverable. 100
    • spark.sql.streaming.minBatchesToRetain   设置的大小对 state 占用的空间有很多的关系。

    Timeouts and State

    One thing to note is that because we manage the state of the group based on user-defined concepts, as expressed above for the use-cases, the semantics of watermark (expiring or discarding an event) may not always apply here. Instead, we have to specify an appropriate timeout ourselves. Timeout dictates how long we should wait before timing out some intermediate state.

    Timeouts can either be based on processing time (GroupStateTimeout.ProcessingTimeTimeout) or event time (GroupStateTimeout.EventTimeTimeout). When using timeouts, you can check for timeout first before processing the values by checking the flag state.hasTimedOut.

    To set processing timeout, use GroupState.setTimeoutDuration(...) method. That means the timeout guarantee will occur under the following conditions:

    • Timeout will never occur before the clock has advanced X ms specified in the method
    • Timeout will eventually occur when there is a trigger in the query, after X ms

    To set event time timeout, use GroupState.setTimeoutTimestamp(...). Only for timeouts based on event time must you specify watermark. As such all events in the group older than watermark will be filtered out, and the timeout will occur when the watermark has advanced beyond the set timestamp.

    When timeouts occur, your function supplied in the streaming query will be invoked with arguments: the key by which you keep the state; an iterator rows of input, and an old state. The example with mapGroupsWithState below defines a number of functional classes and objects used.

    • 1)、GroupState.setTimeoutDuration只适用于ProcessingTimeTimeout ,不适用于EventTimeTimeout
    • 2)、watermark一般用于EventTimeTimeout,但是在ProcessingTimeTimeout 中设置watermark也不会报错。


    2、在 Spark 3.0 中,Proleptic Gregorian calendar 用于解析、格式化和转换日期和时间戳以及提取子组件,如年、日等。Spark 3.0 使用基于ISO chronology的java.time包中的Java 8 API 类。在 Spark 2.4 及以下版本中,这些操作是使用混合日历 ( Julian + Gregorian ) 执行的。这些更改会影响 1582 年 10 月 15 日 (Gregorian) 之前日期的结果,并影响以下 Spark 3.0 API:时间戳/日期字符串的解析/格式化。当用户指定的模式用于解析和格式化时,这会影响 CSV/JSON 数据在unix_timestamp, date_format, to_unix_timestamp, from_unixtime, to_date,to_timestamp函数上的应用。在 Spark 3.0 中,我们在Datetime Patterns for Formatting and Parsing 中定义了我们自己的模式字符串,它是通过DateTimeFormatter在幕后实现的。新实现对其输入执行严格检查。例如,2015-07-22 10:00:00如果模式是yyyy-MM-dd因为解析器不消耗整个输入,则无法解析时间戳。另一个例子是31/01/2015 00:00输入不能被dd/MM/yyyy hh:mm模式解析,因为hh假设小时在范围内1-12. 在 Spark 2.4 及以下版本中,java.text.SimpleDateFormat用于时间戳/日期字符串转换,支持的模式在SimpleDateFormat中描述。可以通过设置spark.sql.legacy.timeParserPolicy为来恢复旧行为LEGACY

    Config Name Description Default Value

    When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing
    dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0.
    When set to CORRECTED, classes from java.time.* packages are used for the same purpose.
    The default value is EXCEPTION, RuntimeException is thrown when we will get different

    spark.sql.session.timeZone     The ID of session local timezone in the format of either region-based zone IDs or
    zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'.
    Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08',
    '+01:00' or '-13:33:33'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'. Other
    short names are not recommended to use because they can be ambiguous



    • spark.sql.files.ignoreMissingFiles:Whether to ignore missing files. If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.This configuration is effective only when using file-based sources such as Parquet, JSON  and ORC.例如出现A file referenced in the transaction log cannot be found.(https://docs.microsoft.com/en-us/azure/databricks/kb/delta/file-transaction-log-not-found),set the Spark property spark.sql.files.ignoreMissingFiles to true in the cluster’s Spark Config.
    • 从 Spark 2.4 开始,Spark 将按照 SQL 标准遵循优先规则来评估查询中引用的集合操作。如果没有用括号指定顺序,除了所有的 INTERSECT 操作都在任何 UNION、EXCEPT 或 MINUS 操作之前执行之外,集合操作从左到右执行。为所有集合操作赋予同等优先级的旧行为在新添加的配置下保留,spark.sql.legacy.setopsPrecedence.enabled默认值为false。当此属性设置为true时,spark 将从左到右评估集合运算符,因为它们出现在查询中,因为没有使用括号强制执行显式排序。
    • 从 Spark 2.4 开始,Spark 默认最大化使用vectorized ORC reader来读取 ORC 文件。要做到这一点,spark.sql.orc.impl和spark.sql.orc.filterPushdown改变它们的默认值分别为native和true。某些旧的 Apache Hive 版本无法读取由native ORC writer创建的 ORC 文件。使用spark.sql.orc.impl=hive创建可以与hive2.1.1及之前版本共享的文件。
    • 从 Spark 2.4 开始,Spark 在比较DATE 类型与 TIMESTAMP 类型时候会将双方都提升为 TIMESTAMP 类型后进行比较将spark.sql.legacy.compareDateTimestampInTimestamp设置为false恢复以前的行为。Spark 3.0 中将删除此选项。
    • 从 Spark 2.4 开始,不允许创建具有非空位置的托管表。尝试创建具有非空位置的托管表时会引发异常。设置spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation为true恢复以前的行为。Spark 3.0 中将删除此选项。
    • 从 Spark 2.0 开始,Spark 默认会转换 Parquet Hive 表以获得更好的性能。从 Spark 2.4 开始,Spark 默认也会转换 ORC Hive 表。这意味着 Spark 默认使用自己的 ORC 支持而不是 Hive SerDe。例如,CREATE TABLE t(id int) STORED AS ORC将在 Spark 2.3 中使用 Hive SerDe 处理,在 Spark 2.4 中,它将转换为 Spark 的 ORC 数据源表并应用 ORC 向量化。设置spark.sql.hive.convertMetastoreOrc=false恢复以前的行为。
    • 在 2.3 及更早版本中,如果行中至少有一个列值格式错误,则 CSV 行被视为格式错误。CSV 解析器在 DROPMALFORMED 模式下丢弃此类行或在 FAILFAST 模式下输出错误。从 Spark 2.4 开始,CSV 行仅在包含从 CSV 数据源请求的格式错误的列值时才被视为格式错误,其他值可以忽略。例如,CSV 文件包含“id,name”标题和一行“1234”。在 Spark 2.4 中,id 列的选择由一列值为 1234 的行组成,但在 Spark 2.3 及更早版本中,它在 DROPMALFORMED 模式下为空。要恢复以前的行为,请设置spark.sql.csv.parser.columnPruning.enabled为false。
    • 从 Spark 2.4 开始,为了计算统计的文件列出操作默认是并行完成的。这可以通过设置spark.sql.statistics.parallelFileListingInStatsComputation.enabled为False来禁用。
    • 在 Spark 2.3 及更早版本中,没有 GROUP BY 的 HAVING 被视为 WHERE。这意味着,SELECT 1 FROM range(10) HAVING true被执行为SELECT 1 FROM range(10) WHERE true 并返回 10 行。这违反了 SQL 标准,并已在 Spark 2.4 中修复。从 Spark 2.4 开始,没有 GROUP BY 的 HAVING 被视为全局聚合,这意味着SELECT 1 FROM range(10) HAVING true只会返回一行。要恢复以前的行为,请设置spark.sql.legacy.parser.havingWithoutGroupByAsWhere为true。
    • 在版本 2.3 及更早版本中,从 Parquet 数据源表读取时,对于 Hive Metastore 模式和 Parquet 模式中列名采用不同字母大小写的任何列,Spark 始终返回 null,无论是否spark.sql.caseSensitive设置为true或false。从 2.4 开始,当spark.sql.caseSensitive设置为 时false,Spark 在 Hive Metastore 模式和 Parquet 模式之间进行不区分大小写的列名解析,因此即使列名在不同的字母大小写中,Spark 也会返回相应的列值。如果存在歧义,即匹配多个 Parquet 列,则会引发异常。当spark.sql.hive.convertMetastoreParquet设置为true时,此更改也适用于 Parquet Hive 表。
    • 从 Spark 2.4.5 开始,TRUNCATE TABLE命令会在重新创建表/分区路径时尝试设置回原始权限和 ACL。要恢复早期版本的行为,请设置spark.sql.truncateTable.ignorePermissionAcl.enabled为true。2)从 Spark 2.4.5 开始,spark.sql.legacy.mssqlserver.numericMapping.enabled添加了配置以支持对 SMALLINT 和 REAL JDBC 类型分别使用 IntegerType 和 DoubleType 的旧 MsSQLServer 方言映射行为。要恢复 2.4.3 及更早版本的行为,请设置spark.sql.legacy.mssqlserver.numericMapping.enabled为true.
    • 在 Spark 2.4 及以下版本中,Dataset.groupByKey如果键是非结构类型,例如 int、string、array 等,则分组数据集的键属性结果被错误地命名为“value”。这是违反直觉的并且使得聚合查询的schema非预期。例如,ds.groupByKey(...).count()的schema是(value, count)。从 Spark 3.0 开始,我们将分组属性命名为“key”。旧行为保留在新添加的配置下spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue,默认值为false。
    • 在 Spark 3.0 中,将值插入具有不同数据类型的表列时,将按照 ANSI SQL 标准执行类型强制。某些不合理的类型转换,例如转换string为int和double到boolean是不允许的。如果值超出列的数据类型的范围,则会引发运行时异常。在 Spark 2.4 及以下版本中,只要cast有效,表插入期间的类型转换是允许的。向整数字段插入超出范围的值时,将插入值的低位(与 Java/Scala 数字类型转换相同)。例如,如果将257插入字节类型的字段,则结果为1。 行为由spark.sql.storeAssignmentPolicy选项控制,默认值为“ANSI”。将该选项设置为“Legacy”可恢复之前的行为。
    • Spark 2.4 及更低版本:SET即使指定的键用于SparkConf条目,该命令也可以在没有任何警告的情况下运行,并且由于该命令不会更新SparkConf而无效,但该行为可能会使用户感到困惑。在 3.0 中,如果使用SparkConf密钥,该命令将失败。您可以通过设置spark.sql.legacy.setCommandRejectsSparkCoreConfs为false来禁用此类检查。
    • 在 Spark 3.0 中,下面列出的属性被保留;如果您在CREATE DATABASE ... WITH DBPROPERTIES和ALTER TABLE ... SET TBLPROPERTIES等位置指定保留属性,则命令将失败。您需要它们的特定子句来指定它们,例如,CREATE DATABASE test COMMENT 'any comment' LOCATION 'some path'。您可以设置spark.sql.legacy.notReserveProperties为true忽略ParseException,在这种情况下,这些属性将被静默删除,例如:SET DBPROPERTIES('location'='/tmp')将无效。在 Spark 2.4 及以下版本中,这些属性既没有保留,也没有副作用,例如,SET DBPROPERTIES('location'='/tmp')不会更改数据库的位置,而只是创建一个无意义的属性像'a'='b'.
    • 在 Spark 3.0 中,您也可以使用ADD FILE添加文件目录。早些时候,您只能使用此命令添加单个文件。要恢复早期版本的行为,请设置spark.sql.legacy.addSingleFileInAddFile为true。
    • 在 Spark 3.0 中,当哈希表达式应用于MapType的元素时候会抛出analysis exception. 要恢复 Spark 3.0 之前的行为,请设置spark.sql.legacy.allowHashOnMapType为true.
    • 在 Spark 3.0 中,当调用array/map函数时不带任何参数,它会返回一个元素类型为 NullType的空集合。在 Spark 2.4 及以下版本中,它返回一个元素类型为StringType的空集合。要恢复 Spark 3.0 之前的行为,您可以设置spark.sql.legacy.createEmptyCollectionUsingStringType为true.
    • 在 Spark 2.4 及以下版本中,您可以通过内置函数如CreateMap、StringToMap等创建带有重复键的映射。带有重复键的映射的行为是未定义的,例如,映射查找选择首个出现的重复键,Dataset.collect只保留最后出现的重复键,MapKeys返回重复的键等。在 Spark 3.0 中,当发现重复的键时,Spark 会抛出RuntimeException。您可以设置spark.sql.mapKeyDedupPolicy为LAST_WIN使用 last wins 策略对映射键进行重复数据删除。用户仍然可以从不强制执行它的数据源(例如 Parquet)读取具有重复键的映射值,行为是未定义的。
    • 在 Spark 3.0 中,org.apache.spark.sql.functions.udf(AnyRef, DataType)默认情况下不允许使用。建议删除返回类型参数以自动切换到类型化 Scala udf,或设置spark.sql.legacy.allowUntypedScalaUDF为 true 以继续使用它。在 Spark 2.4 及以下版本中,如果org.apache.spark.sql.functions.udf(AnyRef, DataType)获取带有原始类型参数的 Scala 闭包,如果输入值为 null,则返回的 UDF 将返回 null。但是,在 Spark 3.0 中,如果输入值为 null,UDF 将返回 Java 类型的默认值。例如,val f = udf((x: Int) => x, IntegerType),f($"x")返回null在spark2.4及以下如果x是null,在spark 3.0返回0。引入此行为更改是因为 Spark 3.0 默认使用 Scala 2.12 构建。
    • 在 Spark 3.0 中,高阶函数exists遵循三值布尔逻辑,即如果predicate返回任何nulls 并且没有获取到true值 ,则exists返回null而不是false。例如,exists(array(1, null, 3), x -> x % 2 == 0)是null。可以通过设置spark.sql.legacy.followThreeValuedLogicInArrayExists为来恢复以前的行为false。
    • 在 Spark 3.0 中,以科学记数法(例如,1E2)书写的数字将被解析为 Double。在 Spark 2.4 及以下版本中,它们被解析为decimal.要恢复 Spark 3.0 之前的行为,您可以设置spark.sql.legacy.exponentLiteralAsDecimal.enabled为true.
    • 在 Spark 3.0 中,日期时间间隔字符串被转换为from与to边界相关的间隔。如果输入字符串与指定边界定义的模式不匹配,ParseException则抛出异常。例如,interval '2 10:20' hour to minute引发异常,因为预期的格式是[+|-]h[h]:[m]m. 在 Spark 2.4 版本中,from没有考虑边界,而是to使用边界来截断结果区间。例如,所示示例中的日间时间间隔字符串转换为interval 10 hours 20 minutes. 要恢复 Spark 3.0 之前的行为,您可以设置spark.sql.legacy.fromDayTimeString.enabled为true.
    • 在 Spark 3.0 中,默认情况下不允许十进制的负小数位,例如像1E10BDis 之类的文字数据类型DecimalType(11, 0)。在 Spark 2.4 及以下版本中,它是DecimalType(2, -9). 要恢复 Spark 3.0 之前的行为,您可以设置spark.sql.legacy.allowNegativeScaleOfDecimal为true.
    • 在 Spark 3.0 中,如果数据集查询包含由自联接引起的不明确的列引用,则查询失败。一个典型的例子:val df1 = ...; val df2 = df1.filter(...);,然后df1.join(df2, df1("a") > df2("a"))返回一个非常令人困惑的空结果。这是因为 Spark 无法解析指向自联接表的 Dataset 列引用,df1("a")这与df2("a")Spark 中的完全相同。要恢复 Spark 3.0 之前的行为,您可以设置spark.sql.analyzer.failAmbiguousSelfJoin为false.
    • 在 Spark 3.0 中,spark.sql.legacy.ctePrecedencePolicy引入了控制嵌套 WITH 子句中名称冲突的行为。默认情况下EXCEPTION,Spark 会抛出 AnalysisException,它会强制用户选择他们想要的特定替换顺序。如果设置为CORRECTED(推荐),内部 CTE 定义优先于外部定义。例如,将 config 设置为false,WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2返回2,而将其设置为LEGACY,结果是12.4 及以下版本中的行为。
    • 在 Spark 3.0 中,配置spark.sql.crossJoin.enabled成为内部配置,默认情况下为 true,因此默认情况下,spark 不会在隐式交叉联接的 sql 上引发异常。
    • 在Spark 3.0中, Spark转换String 为Date/Timestamp类型通过与dates/timestamp类型进行二进制比较. 转换Date/Timestamp为String类型的之前行为可以通过设置spark.sql.legacy.typeCoercion.datetimeToString.enabled=true来还原。
    • 在 Spark 3.0 中,从字符串到日期和时间戳的转换支持特殊值。这些值只是简单的符号简写,在读取时会转换为普通的日期或时间戳值。日期支持以下字符串值:epoch [zoneId] - 1970-01-01today [zoneId] - 指定时区中的当前日期 spark.sql.session.timeZoneyesterday [zoneId] - 当前日期 - 1tomorrow [zoneId] - 当前日期 + 1now- 运行当前查询的日期。它和今天的概念是一样的例如SELECT date 'tomorrow' - date 'yesterday';应该输出2. 以下是特殊的时间戳值:epoch [zoneId] - 1970-01-01 00:00:00+00(Unix 系统时间为零)today [zoneId] - 今天午夜yesterday [zoneId] - 昨天午夜tomorrow [zoneId] - 明天午夜now - 当前查询开始时间例如SELECT timestamp 'tomorrow';。
    • 在 Spark 2.4 及以下版本中,当使用 Spark 原生数据源(parquet/orc)读取 Hive SerDe 表时,Spark 会推断实际文件的schema 并更新 Metastore 中的表schema 。在 Spark 3.0 中,Spark 不再推断schema 。这应该不会给最终用户带来任何问题,但如果确实如此,请将其设置spark.sql.hive.caseSensitiveInferenceMode为INFER_AND_SAVE.
    • 在 Spark 2.4 及以下版本中,如果无法将分区列值转换为相应的用户提供的架构,则将其转换为 null。在 3.0 中,分区列值使用用户提供的架构进行验证。如果验证失败,则抛出异常。您可以通过设置spark.sql.sources.validatePartitionColumns为来禁用此类验证 false。
    • 在 Spark 3.0 中,如果文件或子目录在递归目录列表期间消失(即,它们出现在中间列表中,但由于并发文件删除或对象存储一致性问题,在递归目录列表的后期阶段无法读取或列出) ) 那么列表将失败并出现异常,除非spark.sql.files.ignoreMissingFiles是true(默认false)。在以前的版本中,这些丢失的文件或子目录将被忽略。请注意,这种行为更改仅适用于初始表文件列表期间(或期间REFRESH TABLE),而不适用于查询执行期间:净变化spark.sql.files.ignoreMissingFiles是现在在表文件列表/查询计划期间遵守,而不仅仅是在查询执行时。
    • 在Spark2.4及以下版本中, JSON数据源解析将 empty strings 转换为null 对于一些数据类型如 IntegerType. 对于FloatType, DoubleType, DateType and TimestampType, 遇见 empty strings 会失败and并会 throws exceptions. Spark 3.0 不允许 empty strings 并且会为除了StringType and BinaryType外的其他类型 抛出异常. 可以通过设置spark.sql.legacy.json.allowEmptyString.enabled为true来恢复之前允许空字符串的行为。
    • 在 Spark 2.4 版中,当通过 cloneSession()创建Spark session时候,新创建的 Spark session会从其父级SparkContext继承其配置,即使在其父级 Spark session中可能存在具有不同值的相同配置。在 Spark 3.0 中,parent SparkSession 的配置比 parent SparkContext具有更高的优先级。您可以通过设置spark.sql.legacy.sessionInitWithConfigDefaults为来恢复旧行为true。
    • 在 Spark 3.0 中,我们将内置的 Hive 从 1.2 升级到了 2.3,它带来了以下影响您可能需要根据要连接的hive metastore的版本设定spark.sql.hive.metastore.version及spark.sql.hive.metastore.jars。例如:如果您的 Hive Metastore 版本是 1.2.1 ,则设置spark.sql.hive.metastore.version为1.2.1和。spark.sql.hive.metastore.jars为maven您需要将您的自定义 SerDes 迁移到 Hive 2.3 或使用hive-1.2配置文件构建您自己的 Spark 。有关更多详细信息,请参阅HIVE-15167。小数字符串表示在hive1.2跟hive2.3中表示不同,当使用转换算子在SQL中用于用于脚本转换。在 Hive 1.2 中,字符串表示省略尾随零。但是在 Hive 2.3 中,如果需要,它总是用尾随零填充到 18 位数字。
    • 在 Spark 3.0 中,如果 JSON 数据源和 JSON 函数schema_of_json与 JSON 选项定义的模式匹配,则它们会从字符串值推断 TimestampType timestampFormat。从 3.0.1 版本开始,默认情况下禁用时间戳类型推断。将 JSON 选项inferTimestamp设置true为启用此类类型推断。
    • 在spark3.1,统计聚合函数包括std,stddev,stddev_samp,variance,var_samp,skewness,kurtosis,covar_samp,corr将返回NULL而不是Double.NaN当在表达评估期间除0时发生,例如,当stddev_samp在单个元件组施加。在 Spark 3.0 及更早版本中,它会在这种情况下返回Double.NaN。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.statisticalAggregate为true.
    • 在 Spark 3.1 中, grouping_id() 返回long值。在 Spark 3.0 及更早版本中,此函数返回 int 值。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.integerGroupingId为true.
    • 在 Spark 3.1 中,SQL UI 数据采用formatted展示查询计划解释结果。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.ui.explainMode为extended.
    • 在 Spark 3.1 中,struct和map在将它们转换为字符串时由方括号{}包裹。例如,show()动作和CAST表达式使用这样的括号。在 Spark 3.0 及更早版本中,[]括号用于相同的目的。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.castComplexTypesToString.enabled为true.
    • 在 Spark 3.1 中,结构、数组和映射的 NULL 元素在将它们转换为字符串时被转换为“null”。在 Spark 3.0 或更早版本中,NULL 元素被转换为空字符串。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.castComplexTypesToString.enabled为true.
    • 在 Spark 3.1 中当spark.sql.ansi.enabled为 false,如果小数类型列的总和溢出,Spark 总是返回 null。在 Spark 3.0 或更早版本中,小数类型列的总和可能会返回 null 或不正确的结果,甚至在运行时失败(取决于实际的查询计划执行情况)。
    • 在spark 3.1中 path option不能与下面的方法DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start()共存当下面的方法调用path参数。除此之外paths option不能与DataFrameReader.load()共存。例如. spark.read.format("csv").option("path", "/tmp").load("/tmp2") or spark.read.option("path", "/tmp").csv("/tmp2") will throw org.apache.spark.sql.AnalysisException。在spark 3.0及以下版本path option选项被覆盖当一个路径参数传递到上述方法中。 path option被添加到path列表中,当多个path参数传递给DataFrameReader.load().要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.pathOptionBehavior.enabled为true.
    • 在 Spark 3.1 中,如果时间戳在 1900-01-01 00:00:00Z 之前并且加载(保存)为 INT96 类型,则从/向parquet文件加载和保存时间戳会失败。在 Spark 3.0 中,操作不会失败,但可能会导致输入时间戳的偏移,因为从/到 Julian 到/从 Proleptic Gregorian 日历变基。要恢复 Spark 3.1 之前的行为,您可以将spark.sql.legacy.parquet.int96RebaseModeInRead或/和设置spark.sql.legacy.parquet.int96RebaseModeInWrite为LEGACY。
    • 在 Spark 3.1 中,临时视图将具有与永久视图相同的行为,即捕获和存储运行时 SQL 配置、SQL 文本、目录和命名空间。捕获的视图属性将在视图分辨率的解析和分析阶段应用。要恢复 Spark 3.1 之前的行为,您可以设置spark.sql.legacy.storeAnalyzedPlanForView为true.
    • 在 Spark 3.1 中,通过CACHE TABLE ... AS SELECT创建的临时视图也将具有与永久视图相同的行为。特别是,当临时视图被删除时,Spark 将使其所有缓存依赖项以及临时视图本身的缓存失效。这与 Spark 3.0 及以下版本不同,后者只做后者。要恢复以前的行为,您可以设置spark.sql.legacy.storeAnalyzedPlanForView为true。
    • 从 Spark 3.1 开始,表模式支持 CHAR/CHARACTER 和 VARCHAR 类型。表扫描/插入将尊重 char/varchar 语义。如果在表模式以外的地方使用 char/varchar,则会抛出异常(CAST 是一个异常,它像以前一样简单地将 char/varchar 视为字符串)。要恢复 Spark 3.1 之前的行为,它将它们视为 STRING 类型并忽略长度参数,例如CHAR(4),您可以设置spark.sql.legacy.charVarcharAsString为true。


    package org.apache.spark.sql.internal
    import java.util.{Locale, NoSuchElementException, Properties, TimeZone}
    import java.util
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.atomic.AtomicReference
    import java.util.zip.Deflater
    import scala.collection.JavaConverters._
    import scala.collection.immutable
    import scala.util.Try
    import scala.util.control.NonFatal
    import scala.util.matching.Regex
    import org.apache.hadoop.fs.Path
    import org.apache.spark.{SparkConf, SparkContext, TaskContext}
    import org.apache.spark.internal.Logging
    import org.apache.spark.internal.config._
    import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES}
    import org.apache.spark.network.util.ByteUnit
    import org.apache.spark.sql.catalyst.ScalaReflection
    import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
    import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
    import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
    import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
    import org.apache.spark.sql.catalyst.util.DateTimeUtils
    import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
    import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
    import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
    import org.apache.spark.unsafe.array.ByteArrayMethods
    import org.apache.spark.util.Utils
    // This file defines the configuration options for Spark SQL.
    object SQLConf {
      private[this] val sqlConfEntriesUpdateLock = new Object
      private[this] var sqlConfEntries: util.Map[String, ConfigEntry[_]] = util.Collections.emptyMap()
      private[this] val staticConfKeysUpdateLock = new Object
      private[this] var staticConfKeys: java.util.Set[String] = util.Collections.emptySet()
      private def register(entry: ConfigEntry[_]): Unit = sqlConfEntriesUpdateLock.synchronized {
          s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
        val updatedMap = new java.util.HashMap[String, ConfigEntry[_]](sqlConfEntries)
        updatedMap.put(entry.key, entry)
        sqlConfEntries = updatedMap
      // For testing only
      private[sql] def unregister(entry: ConfigEntry[_]): Unit = sqlConfEntriesUpdateLock.synchronized {
        val updatedMap = new java.util.HashMap[String, ConfigEntry[_]](sqlConfEntries)
        sqlConfEntries = updatedMap
      private[internal] def getConfigEntry(key: String): ConfigEntry[_] = {
      private[internal] def getConfigEntries(): util.Collection[ConfigEntry[_]] = {
      private[internal] def containsConfigEntry(entry: ConfigEntry[_]): Boolean = {
        getConfigEntry(entry.key) == entry
      private[sql] def containsConfigKey(key: String): Boolean = {
      def registerStaticConfigKey(key: String): Unit = staticConfKeysUpdateLock.synchronized {
        val updated = new util.HashSet[String](staticConfKeys)
        staticConfKeys = updated
      def isStaticConfigKey(key: String): Boolean = staticConfKeys.contains(key)
      def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
      def buildStaticConf(key: String): ConfigBuilder = {
        ConfigBuilder(key).onCreate { entry =>
       * Merge all non-static configs to the SQLConf. For example, when the 1st [[SparkSession]] and
       * the global [[SharedState]] have been initialized, all static configs have taken affect and
       * should not be set to other values. Other later created sessions should respect all static
       * configs and only be able to change non-static configs.
      private[sql] def mergeNonStaticSQLConfigs(
          sqlConf: SQLConf,
          configs: Map[String, String]): Unit = {
        for ((k, v) <- configs if !staticConfKeys.contains(k)) {
          sqlConf.setConfString(k, v)
       * Extract entries from `SparkConf` and put them in the `SQLConf`
      private[sql] def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
        sparkConf.getAll.foreach { case (k, v) =>
          sqlConf.setConfString(k, v)
       * Default config. Only used when there is no active SparkSession for the thread.
       * See [[get]] for more information.
      private lazy val fallbackConf = new ThreadLocal[SQLConf] {
        override def initialValue: SQLConf = new SQLConf
      /** See [[get]] for more information. */
      def getFallbackConf: SQLConf = fallbackConf.get()
      private lazy val existingConf = new ThreadLocal[SQLConf] {
        override def initialValue: SQLConf = null
      def withExistingConf[T](conf: SQLConf)(f: => T): T = {
        val old = existingConf.get()
        try {
        } finally {
          if (old != null) {
          } else {
       * Defines a getter that returns the SQLConf within scope.
       * See [[get]] for more information.
      private val confGetter = new AtomicReference[() => SQLConf](() => fallbackConf.get())
       * Sets the active config object within the current scope.
       * See [[get]] for more information.
      def setSQLConfGetter(getter: () => SQLConf): Unit = {
       * Returns the active config object within the current scope. If there is an active SparkSession,
       * the proper SQLConf associated with the thread's active session is used. If it's called from
       * tasks in the executor side, a SQLConf will be created from job local properties, which are set
       * and propagated from the driver side, unless a `SQLConf` has been set in the scope by
       * `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
       * from DataFrames.
       * The way this works is a little bit convoluted, due to the fact that config was added initially
       * only for physical plans (and as a result not in sql/catalyst module).
       * The first time a SparkSession is instantiated, we set the [[confGetter]] to return the
       * active SparkSession's config. If there is no active SparkSession, it returns using the thread
       * local [[fallbackConf]]. The reason [[fallbackConf]] is a thread local (rather than just a conf)
       * is to support setting different config options for different threads so we can potentially
       * run tests in parallel. At the time this feature was implemented, this was a no-op since we
       * run unit tests (that does not involve SparkSession) in serial order.
      def get: SQLConf = {
        if (TaskContext.get != null) {
          val conf = existingConf.get()
          if (conf != null) {
          } else {
            new ReadOnlySQLConf(TaskContext.get())
        } else {
          val isSchedulerEventLoopThread = SparkContext.getActive
            .flatMap { sc => Option(sc.dagScheduler) }
            .exists(_.getId == Thread.currentThread().getId)
          if (isSchedulerEventLoopThread) {
            // DAGScheduler event loop thread does not have an active SparkSession, the `confGetter`
            // will return `fallbackConf` which is unexpected. Here we require the caller to get the
            // conf within `withExistingConf`, otherwise fail the query.
            val conf = existingConf.get()
            if (conf != null) {
            } else if (Utils.isTesting) {
              throw QueryExecutionErrors.cannotGetSQLConfInSchedulerEventLoopThreadError()
            } else {
          } else {
            val conf = existingConf.get()
            if (conf != null) {
            } else {
      val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")
        .doc("The max number of iterations the analyzer runs.")
      val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules")
        .doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " +
          "specified by their rule names and separated by comma. It is not guaranteed that all the " +
          "rules in this configuration will eventually be excluded, as some rules are necessary " +
          "for correctness. The optimizer will log the rules that have indeed been excluded.")
      val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
        .doc("The max number of iterations the optimizer runs.")
          .doc("The threshold of set size for InSet conversion.")
          .doc("Configures the max set size in InSet for which Spark will generate code with " +
            "switch statements. This is applicable only to bytes, shorts, ints, dates.")
          .checkValue(threshold => threshold >= 0 && threshold <= 600, "The max set size " +
            "for using switch statements in InSet must be non-negative and less than or equal to 600")
      val PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.planChangeLog.level")
        .doc("Configures the log level for logging the change from the original plan to the new " +
          "plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
          "'warn', or 'error'. The default log level is 'trace'.")
        .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
          "Invalid value for 'spark.sql.planChangeLog.level'. Valid values are " +
            "'trace', 'debug', 'info', 'warn' and 'error'.")
      val PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.planChangeLog.rules")
        .doc("Configures a list of rules for logging plan changes, in which the rules are " +
          "specified by their rule names and separated by comma.")
      val PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.planChangeLog.batches")
        .doc("Configures a list of batches for logging plan changes, in which the batches " +
          "are specified by their batch names and separated by comma.")
          .doc("When true, we will generate predicate for partition column when it's used as join key")
          .doc("When true, distinct count statistics will be used for computing the data size of the " +
            "partitioned table after dynamic partition pruning, in order to evaluate if it is worth " +
            "adding an extra subquery as the pruning filter if broadcast reuse is not applicable.")
          .doc("When statistics are not available or configured not to be used, this config will be " +
            "used as the fallback filter ratio for computing the data size of the partitioned table " +
            "after dynamic partition pruning, in order to evaluate if it is worth adding an extra " +
            "subquery as the pruning filter if broadcast reuse is not applicable.")
          .doc("When true, dynamic partition pruning will only apply when the broadcast exchange of " +
            "a broadcast hash join operation can be reused as the dynamic pruning filter.")
      val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
        .doc("When set to true Spark SQL will automatically select a compression codec for each " +
          "column based on statistics of the data.")
      val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize")
        .doc("Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
          "memory utilization and compression, but risk OOMs when caching data.")
          .doc("When true, enable partition pruning for in-memory columnar tables.")
          .doc("When true, enable in-memory table scan accumulators.")
          .doc("Enables vectorized reader for columnar caching.")
          .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
      val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
        .doc("When true, prefer sort merge join over shuffled hash join. " +
          "Sort merge join consumes less memory than shuffled hash join and it works efficiently " +
          "when both join tables are large. On the other hand, shuffled hash join can improve " +
          "performance (e.g., of full outer joins) when one of join tables is much smaller.")
      val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
        .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
          "requires additional memory to be reserved up-front. The memory overhead may be " +
          "significant when sorting very small rows (up to 50% more in this case).")
      val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
        .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
          "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
          "Note that currently statistics are only supported for Hive Metastore tables where the " +
          "command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
          "run, and file-based data source tables where the statistics are computed directly on " +
          "the files of data.")
      val SHUFFLE_HASH_JOIN_FACTOR = buildConf("spark.sql.shuffledHashJoinFactor")
        .doc("The shuffle hash join can be selected if the data size of small" +
          " side multiplied by this factor is still smaller than the large side.")
        .checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
      val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
        .doc("Minimal increase rate in number of partitions between attempts when executing a take " +
          "on a query. Higher values lead to more partitions read. Lower values might lead to " +
          "longer execution times as more jobs will be run")
          .doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
      val LEAF_NODE_DEFAULT_PARALLELISM = buildConf("spark.sql.leafNodeDefaultParallelism")
        .doc("The default parallelism of Spark SQL leaf nodes that produce data, such as the file " +
          "scan node, the local data scan node, the range node, etc. The default value of this " +
          "config is 'SparkContext#defaultParallelism'.")
        .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.")
      val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
        .doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
          "Note: For structured streaming, this configuration cannot be changed between query " +
          "restarts from the same checkpoint location.")
        .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
          .doc("(Deprecated since Spark 3.0)")
          .checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
      val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
        .doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " +
          "middle of query execution, based on accurate runtime statistics.")
      val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply")
        .doc("Adaptive query execution is skipped when the query does not have exchanges or " +
          "sub-queries. By setting this config to true (together with " +
          s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " +
          "execution for all supported queries.")
      val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
        .doc("Configures the log level for adaptive execution logging of plan changes. The value " +
          "can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'debug'.")
        .checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR"))
          .doc("The advisory size in bytes of the shuffle partition during adaptive optimization " +
            s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect when Spark " +
            "coalesces small shuffle partitions or splits skewed shuffle partition.")
          .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will coalesce " +
            "contiguous shuffle partitions according to the target size (specified by " +
            s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.")
          .doc("When true, Spark does not respect the target size specified by " +
            s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
            "shuffle partitions, but adaptively calculate the target size according to the default " +
            "parallelism of the Spark cluster. The calculated size is usually smaller than the " +
            "configured target size. This is to maximize the parallelism and avoid performance " +
            "regression when enabling adaptive query execution. It's recommended to set this config " +
            "to false and respect the configured target size.")
          .doc("The minimum size of shuffle partitions after coalescing. This is useful when the " +
            "adaptively calculated target size is too small during partition coalescing.")
          .checkValue(_ > 0, "minPartitionSize must be positive")
          .doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
            "after coalescing. If not set, the default value is the default parallelism of the " +
            "Spark cluster. This configuration only has an effect when " +
            s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
            s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
          .checkValue(_ > 0, "The minimum number of partitions must be positive.")
          .doc("The initial number of shuffle partitions before coalescing. If not set, it equals to " +
            s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect when " +
            "are both true.")
          .checkValue(_ > 0, "The initial number of partitions must be positive.")
          .doc("Whether to fetch the contiguous shuffle blocks in batch. Instead of fetching blocks " +
            "one by one, fetching contiguous shuffle blocks for the same map task in batch can " +
            "reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
            s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
            s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
            "on a relocatable serializer, the concatenation support codec in use, the new version " +
            "shuffle fetch protocol and io encryption is disabled.")
          .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark tries to use local " +
            "shuffle reader to read the shuffle data when the shuffle partitioning is not needed, " +
            "for example, after converting sort-merge join to broadcast-hash join.")
          .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark dynamically " +
            "handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and " +
            "replicating if needed) skewed partitions.")
          .doc("A partition is considered as skewed if its size is larger than this factor " +
            "multiplying the median partition size and also larger than " +
          .checkValue(_ >= 0, "The skew factor cannot be negative.")
          .doc("A partition is considered as skewed if its size in bytes is larger than this " +
            s"threshold and also larger than '${SKEW_JOIN_SKEWED_PARTITION_FACTOR.key}' " +
            "multiplying the median partition size. Ideally this config should be set larger " +
            s"than '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
          .doc("The relation with a non-empty partition ratio lower than this config will not be " +
            "considered as the build side of a broadcast-hash join in adaptive execution regardless " +
            "of its size.This configuration only has an effect when " +
            s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.")
          .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
          .doc("Configures a list of rules to be disabled in the adaptive optimizer, in which the " +
            "rules are specified by their rule names and separated by comma. The optimizer will log " +
            "the rules that have indeed been excluded.")
          .doc("Configures the maximum size in bytes for a table that will be broadcast to all " +
            "worker nodes when performing a join. By setting this value to -1 broadcasting can be " +
            s"disabled. The default value is same with ${AUTO_BROADCASTJOIN_THRESHOLD.key}. " +
            "Note that, this config is used only in adaptive framework.")
          .doc("Configures the maximum size in bytes per partition that can be allowed to build " +
            "local hash map. If this value is not smaller than " +
            s"${ADVISORY_PARTITION_SIZE_IN_BYTES.key} and all the partition size are not larger " +
            "than this config, join selection prefer to use shuffled hash join instead of " +
            s"sort merge join regardless of the value of ${PREFER_SORTMERGEJOIN.key}.")
          .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will optimize the " +
            "skewed shuffle partitions in RebalancePartitions and split them to smaller ones " +
            s"according to the target size (specified by '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), " +
            "to avoid data skew.")
          .doc("When true, force enable OptimizeSkewedJoin even if it introduces extra shuffle.")
          .doc("The custom cost evaluator class to be used for adaptive execution. If not being set," +
            " Spark will use its own SimpleCostEvaluator by default.")
          .doc("When true, common subexpressions will be eliminated.")
          .doc("The maximum entries of the cache used for interpreted subexpression elimination.")
          .checkValue(_ >= 0, "The maximum must not be negative")
      val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive")
        .doc("Whether the query analyzer should be case sensitive or not. " +
          "Default to case insensitive. It is highly discouraged to turn on case sensitive mode.")
      val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
        .doc("When true, the query optimizer will infer and propagate data constraints in the query " +
          "plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
          "for certain kinds of query plans (such as those with a large number of predicates and " +
          "aliases) which might negatively impact overall runtime.")
      val ESCAPED_STRING_LITERALS = buildConf("spark.sql.parser.escapedStringLiterals")
        .doc("When true, string literals (including regex patterns) remain escaped in our SQL " +
          "parser. The default is false since Spark 2.0. Setting it to true can restore the behavior " +
          "prior to Spark 2.0.")
      val FILE_COMPRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
        .doc("When estimating the output data size of a table scan, multiply the file size with this " +
          "factor as the estimated data size, in case the data is compressed in the file and lead to" +
          " a heavily underestimated result.")
        .checkValue(_ > 0, "the value of fileCompressionFactor must be greater than 0")
      val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
        .doc("When true, the Parquet data source merges schemas collected from all data files, " +
             "otherwise the schema is picked from the summary file or a random data file " +
             "if no summary file is available.")
      val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles")
        .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
             "summary files and we will ignore them when merging schema. Otherwise, if this is " +
             "false, which is the default, we will merge all part-files. This should be considered " +
             "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
      val PARQUET_BINARY_AS_STRING = buildConf("spark.sql.parquet.binaryAsString")
        .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
          "Spark SQL, do not differentiate between binary data and strings when writing out the " +
          "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
          "compatibility with these systems.")
      val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp")
        .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
          "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
          "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
          "provide compatibility with these systems.")
      val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
        .doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
          "converting to timestamps, for data written by Impala.  This is necessary because Impala " +
          "stores INT96 data with a different timezone offset than Hive & Spark.")
      object ParquetOutputTimestampType extends Enumeration {
      val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
        .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
          "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
          "is a standard timestamp type in Parquet, which stores number of microseconds from the " +
          "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
          "means Spark has to truncate the microsecond portion of its timestamp value.")
      val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
        .doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
          "`parquet.compression` is specified in the table-specific options/properties, the " +
          "precedence would be `compression`, `parquet.compression`, " +
          "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
          "snappy, gzip, lzo, brotli, lz4, zstd.")
        .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
      val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
        .doc("Enables Parquet filter push-down optimization when set to true.")
      val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
        .doc("If true, enables Parquet filter push-down optimization for Date. " +
          s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
          .doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
            s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
            "enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
          .doc("If true, enables Parquet filter push-down optimization for Decimal. " +
            s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
        .doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
          s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
          .doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " +
            "number of values not exceeds this threshold. Otherwise, Parquet filter will push-down " +
            "a value greater than or equal to its minimum value and less than or equal to " +
            "its maximum value. By setting this value to 0 this feature can be disabled. " +
            s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
          .checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
      val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.aggregatePushdown")
        .doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
          " down to Parquet for optimization. MAX/MIN/COUNT for complex types and timestamp" +
          " can't be pushed down")
      val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
        .doc("If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal " +
          "values will be written in Apache Parquet's fixed-length byte array format, which other " +
          "systems such as Apache Hive and Apache Impala use. If false, the newer format in Parquet " +
          "will be used. For example, decimals will be written in int-based format. If Parquet " +
          "output is intended for use with systems that do not support this newer format, set to true.")
      val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
        .doc("The output committer class used by Parquet. The specified class needs to be a " +
          "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
          "of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata " +
          "summaries will never be created, irrespective of the value of " +
          .doc("Enables vectorized parquet decoding.")
      val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
        .doc("If true, enables Parquet's native record-level filtering using the pushed down " +
          "filters. " +
          s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' " +
          "is enabled and the vectorized reader is not used. You can ensure the vectorized reader " +
          s"is not used by setting '${PARQUET_VECTORIZED_READER_ENABLED.key}' to false.")
      val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
        .doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
          "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
      val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
        .doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
          "`orc.compress` is specified in the table-specific options/properties, the precedence " +
          "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
          "Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd, lz4.")
        .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd", "lz4"))
      val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
        .doc("When native, use the native version of ORC support instead of the ORC library in Hive. " +
          "It is 'hive' by default prior to Spark 2.4.")
        .checkValues(Set("hive", "native"))
      val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
        .doc("Enables vectorized orc decoding.")
      val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
        .doc("The number of rows to include in a orc vectorized reader batch. The number should " +
          "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
          .doc("Enables vectorized orc decoding for nested column.")
      val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
        .doc("When true, enable filter pushdown for ORC files.")
      val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
        .doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
          "COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
          "type. For COUNT, support all data types.")
      val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema")
        .doc("When true, the Orc data source merges schemas collected from all data files, " +
          "otherwise the schema is picked from a random data file.")
      val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
        .doc("When true, check all the partition paths under the table\'s root directory " +
             "when reading data stored in HDFS. This configuration will be deprecated in the future " +
             s"releases and replaced by ${SPARK_IGNORE_MISSING_FILES.key}.")
          .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
               "unmatching partitions can be eliminated earlier.")
          .doc("The threshold of set size for InSet predicate when pruning partitions through Hive " +
            "Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " +
            "to be greater than or equal to the minimum value in set and less than or equal to the " +
            "maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " +
            "InSet inside Not with values exceeding the threshold, we won't push it to Hive Metastore."
          .checkValue(_ > 0, "The value of metastorePartitionPruningInSetThreshold must be positive")
          .doc("Whether to fallback to get all partitions from Hive metastore and perform partition " +
            "pruning on Spark client side, when encountering MetaException from the metastore. Note " +
            "that Spark query performance may degrade if this is enabled and there are many " +
            "partitions to be listed. If this is disabled, Spark will fail the query instead.")
          .doc("When this config is enabled, if the predicates are not supported by Hive or Spark " +
            "does fallback due to encountering MetaException from the metastore, " +
            "Spark will instead prune partitions by getting the partition names first " +
            "and then evaluating the filter expressions on the client side. " +
            "Note that the predicates with TimeZoneAwareExpression is not supported.")
          .doc("When true, enable metastore partition management for file source tables as well. " +
               "This includes both datasource and converted Hive tables. When partition management " +
               "is enabled, datasource tables store partition in the Hive metastore, and use the " +
               s"metastore to prune partitions during query planning when " +
               s"${HIVE_METASTORE_PARTITION_PRUNING.key} is set to true.")
          .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
               "a cache that can use up to specified num bytes for file metadata. This conf only " +
               "has an effect when hive filesource partition management is enabled.")
          .createWithDefault(250 * 1024 * 1024)
      object HiveCaseSensitiveInferenceMode extends Enumeration {
      val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
        .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive Serde " +
          "table's properties when reading the table with Spark native data sources. Valid options " +
          "include INFER_AND_SAVE (infer the case-sensitive schema from the underlying data files " +
          "and write it back to the table properties), INFER_ONLY (infer the schema but don't " +
          "attempt to write it to the table properties) and NEVER_INFER (the default mode-- fallback " +
          "to using the case-insensitive metastore schema instead of inferring).")
          .doc("The maximum length allowed in a single cell when storing Spark-specific information " +
            "in Hive's metastore as table properties. Currently it covers 2 things: the schema's " +
            "JSON string, the histogram of column statistics.")
      val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
        .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
          "to produce the partition columns instead of table scans. It applies when all the columns " +
          "scanned are partition columns and the query has an aggregate operator that satisfies " +
          "distinct semantics. By default the optimization is disabled, and deprecated as of Spark " +
          "3.0 since it may return incorrect results when the files are empty, see also SPARK-26709." +
          "It will be removed in the future releases. If you must use, use 'SparkSessionExtensions' " +
          "instead to inject it as a custom rule.")
      val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
        .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
          "to parse.")
      val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout")
        .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
        .createWithDefaultString(s"${5 * 60}")
      // This is only used for the thriftserver
      val THRIFTSERVER_POOL = buildConf("spark.sql.thriftserver.scheduler.pool")
        .doc("Set a Fair Scheduler pool for a JDBC client session.")
          .doc("When true, enable incremental collection for execution in Thrift Server.")
          .doc("When true, all running tasks will be interrupted if one cancels a query. " +
            "When false, all running tasks will remain until finished.")
          .doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " +
            "a positive value, a running query will be cancelled automatically when the timeout is " +
            "exceeded, otherwise the query continues to run till completion. If timeout values are " +
            "set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
            "than this configuration value, they take precedence. If you set this timeout and prefer " +
            "to cancel the queries right away without waiting task to finish, consider enabling " +
            s"${THRIFTSERVER_FORCE_CANCEL.key} together.")
          .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
      val THRIFTSERVER_UI_SESSION_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedSessions")
        .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
      // This is used to set the default data source
      val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
        .doc("The default data source to use in input/output.")
      val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS")
        .doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
          "without specifying any storage property will be converted to a data source table, " +
          s"using the data source set by ${DEFAULT_DATA_SOURCE_NAME.key}.")
      val GATHER_FASTSTAT = buildConf("spark.sql.hive.gatherFastStats")
          .doc("When true, fast stats (number of files and total size of all files) will be gathered" +
            " in parallel while repairing table partitions to avoid the sequential listing in Hive" +
            " metastore.")
          .doc("When true, automatically infer the data types for partitioned columns.")
      val BUCKETING_ENABLED = buildConf("spark.sql.sources.bucketing.enabled")
        .doc("When false, we will treat bucketed table as normal table")
      val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
        .doc("The maximum number of buckets allowed.")
        .checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be greater than 0")
          .doc("When true, decide whether to do bucketed scan on input tables based on query plan " +
            "automatically. Do not use bucketed scan if 1. query does not have operators to utilize " +
            "bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these " +
            s"operators and table scan. Note when '${BUCKETING_ENABLED.key}' is set to " +
            "false, this configuration does not take any effect.")
          .doc("Whether to forcibly enable some optimization rules that can change the output " +
            "partitioning of a cached query when executing it for caching. If it is set to true, " +
            "queries may need an extra shuffle to read the cached data. This configuration is " +
            "disabled by default. Currently, the optimization rules enabled by this configuration " +
      val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
        .doc("When false, we will throw an error if a query contains a cartesian product without " +
            "explicit CROSS JOIN syntax.")
      val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal")
        .doc("When true, the ordinal numbers are treated as the position in the select list. " +
             "When false, the ordinal numbers in order/sort by clause are ignored.")
      val GROUP_BY_ORDINAL = buildConf("spark.sql.groupByOrdinal")
        .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
          "in the select list. When false, the ordinal numbers are ignored.")
      val GROUP_BY_ALIASES = buildConf("spark.sql.groupByAliases")
        .doc("When true, aliases in a select list can be used in group by clauses. When false, " +
          "an analysis exception is thrown in the case.")
      // The output committer class used by data sources. The specified class needs to be a
      // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
      val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass")
          .doc("The maximum number of paths allowed for listing files at driver side. If the number " +
            "of detected paths exceeds this value during partition discovery, it tries to list the " +
            "files with another Spark distributed job. This configuration is effective only when " +
            "using file-based sources such as Parquet, JSON and ORC.")
          .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
            "files at driver side must not be negative")
          .doc("The number of parallelism to list a collection of path recursively, Set the " +
            "number to prevent file listing from generating too many tasks.")
          .doc("If true, Spark will not fetch the block locations for each file on " +
            "listing files. This speeds up file listing, but the scheduler cannot " +
            "schedule tasks to take advantage of data locality. It can be particularly " +
            "useful if data is read from a remote cluster so the scheduler could never " +
            "take advantage of locality anyway.")
      // Whether to automatically resolve ambiguity in join conditions for self-joins.
      // See SPARK-6231.
          .doc("When true, fail the Dataset query if it contains ambiguous self-join.")
      // Whether to retain group by columns or not in GroupedData.agg.
      val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
      val DATAFRAME_PIVOT_MAX_VALUES = buildConf("spark.sql.pivotMaxValues")
        .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
          "number of (distinct) values that will be collected without error.")
      val RUN_SQL_ON_FILES = buildConf("spark.sql.runSQLOnFiles")
        .doc("When true, we could use `datasource`.`path` as table in SQL query.")
      val WHOLESTAGE_CODEGEN_ENABLED = buildConf("spark.sql.codegen.wholeStage")
        .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
          " method.")
        .doc("When true, embed the (whole-stage) codegen stage ID into " +
          "the class name of the generated class as a suffix")
      val WHOLESTAGE_MAX_NUM_FIELDS = buildConf("spark.sql.codegen.maxFields")
        .doc("The maximum number of fields (including nested fields) that will be supported before" +
          " deactivating whole-stage codegen.")
      val CODEGEN_FACTORY_MODE = buildConf("spark.sql.codegen.factoryMode")
        .doc("This config determines the fallback behavior of several codegen generators " +
          "during tests. `FALLBACK` means trying codegen first and then falling back to " +
          "interpreted if any compile error happens. Disabling fallback if `CODEGEN_ONLY`. " +
          "`NO_CODEGEN` skips codegen and goes interpreted path always. Note that " +
          "this config works only for tests.")
      val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
        .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
          " fail to compile generated code")
      val CODEGEN_LOGGING_MAX_LINES = buildConf("spark.sql.codegen.logging.maxLines")
        .doc("The maximum number of codegen lines to log when errors occur. Use -1 for unlimited.")
        .checkValue(maxLines => maxLines >= -1, "The maximum must be a positive integer, 0 to " +
          "disable logging or -1 to apply no limit.")
      val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
        .doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
          "codegen. When the compiled function exceeds this threshold, the whole-stage codegen is " +
          "deactivated for this subtree of the current query plan. The default value is 65535, which " +
          "is the largest bytecode size possible for a valid Java method. When running on HotSpot, " +
          s"it may be preferable to set the value to ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} " +
          "to match HotSpot's implementation.")
      val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold")
        .doc("The threshold of source-code splitting in the codegen. When the number of characters " +
          "in a single Java function (without comment) exceeds the threshold, the function will be " +
          "automatically split to multiple smaller ones. We cannot know how many bytecode will be " +
          "generated, so use the code length as metric. When running on HotSpot, a function's " +
          "bytecode should not go beyond 8KB, otherwise it will not be JITted; it also should not " +
          "be too small, otherwise there will be many function calls.")
        .checkValue(threshold => threshold > 0, "The threshold must be a positive integer.")
          .doc("When true, whole stage codegen would put the logic of consuming rows of each " +
            "physical operator into individual methods, instead of a single big method. This can be " +
            "used to avoid oversized function that can miss the opportunity of JIT optimization.")
      val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
        .doc("The maximum number of bytes to pack into a single partition when reading files. " +
          "This configuration is effective only when using file-based sources such as Parquet, JSON " +
          "and ORC.")
        .createWithDefaultString("128MB") // parquet.block.size
      val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes")
        .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
          " the same time. This is used when putting multiple files into a partition. It's better to" +
          " over estimated, then the partitions with small files will be faster than partitions with" +
          " bigger files (which is scheduled first). This configuration is effective only when using" +
          " file-based sources such as Parquet, JSON and ORC.")
      val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum")
        .doc("The suggested (not guaranteed) minimum number of split file partitions. " +
          "If not set, the default value is `spark.default.parallelism`. This configuration is " +
          "effective only when using file-based sources such as Parquet, JSON and ORC.")
        .checkValue(v => v > 0, "The min partition number must be a positive integer.")
      val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
        .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
          "encountering corrupted files and the contents that have been read will still be returned. " +
          "This configuration is effective only when using file-based sources such as Parquet, JSON " +
          "and ORC.")
      val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
        .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
          "encountering missing files and the contents that have been read will still be returned. " +
          "This configuration is effective only when using file-based sources such as Parquet, JSON " +
          "and ORC.")
      val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
        .doc("Maximum number of records to write out to a single file. " +
          "If this value is zero or negative, there is no limit.")
      val EXCHANGE_REUSE_ENABLED = buildConf("spark.sql.exchange.reuse")
        .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
      val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.execution.reuseSubquery")
        .doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
      val REMOVE_REDUNDANT_PROJECTS_ENABLED = buildConf("spark.sql.execution.removeRedundantProjects")
        .doc("Whether to remove redundant project exec node based on children's output and " +
          "ordering requirement.")
      val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts")
        .doc("Whether to remove redundant physical sort node")
            "The class used to manage state data in stateful streaming queries. This class must " +
              "be a subclass of StateStoreProvider, and must have a zero-arg constructor. " +
              "Note: For structured streaming, this configuration cannot be changed between query " +
              "restarts from the same checkpoint location.")
          .doc("When true, Spark will validate the state schema against schema on existing state and " +
            "fail query if it's incompatible.")
          .doc("Minimum number of state store delta files that needs to be generated before they " +
            "consolidated into snapshots.")
          .doc("When true, check if the data from state store is valid or not when running streaming " +
            "queries. This can happen if the state store format has been changed. Note, the feature " +
            "is only effective in the build-in HDFS state store provider now.")
          .doc("State format version used by flatMapGroupsWithState operation in a streaming query")
          .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
      val CHECKPOINT_LOCATION = buildConf("spark.sql.streaming.checkpointLocation")
        .doc("The default location for storing checkpoint data for streaming queries.")
          .doc("When true, enable temporary checkpoint locations force delete.")
      val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
        .doc("The minimum number of batches that must be retained and made recoverable.")
      val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
        .doc("The maximum number of batches which will be retained in memory to avoid " +
          "loading from files. The value adjusts a trade-off between memory usage vs cache miss: " +
          "'2' covers both success and direct failure cases, '1' covers only success case, " +
          "and '0' covers extreme case - disable cache to maximize memory size of executors.")
          .doc("The interval in milliseconds between triggering maintenance tasks in StateStore. " +
            "The maintenance task executes background maintenance task in all the loaded store " +
            "providers if they are still the active instances according to the coordinator. If not, " +
            "inactive instances of store providers will be closed.")
          .createWithDefault(TimeUnit.MINUTES.toMillis(1)) // 1 minute
          .doc("The codec used to compress delta and snapshot files generated by StateStore. " +
            "By default, Spark provides four codecs: lz4, lzf, snappy, and zstd. You can also " +
            "use fully qualified class names to specify the codec. Default codec is lz4.")
       * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated
       * together.
          .doc("Set the RocksDB format version. This will be stored in the checkpoint when starting " +
            "a streaming query. The checkpoint will use this RocksDB format version in the entire " +
            "lifetime of the query.")
          .checkValue(_ >= 0, "Must not be negative")
          // 5 is the default table format version for RocksDB 6.20.3.
          .doc("State format version used by streaming aggregation operations in a streaming query. " +
            "State between versions are tend to be incompatible, so state format version shouldn't " +
            "be modified after running.")
          .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
        .doc("Running multiple runs of the same streaming query concurrently is not supported. " +
          "If we find a concurrent active run for a streaming query (in the same or different " +
          "SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " +
          "query run to start the new one.")
          .doc("State format version used by streaming join operations in a streaming query. " +
            "State between versions are tend to be incompatible, so state format version shouldn't " +
            "be modified after running.")
          .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
          .doc("When true, streaming session window sorts and merge sessions in local partition " +
            "prior to shuffle. This is to reduce the rows to shuffle, but only beneficial when " +
            "there're lots of rows in a batch being assigned to same sessions.")
          .doc("State format version used by streaming session window in a streaming query. " +
            "State between versions are tend to be incompatible, so state format version shouldn't " +
            "be modified after running.")
          .checkValue(v => Set(1).contains(v), "Valid version is 1")
          .doc("When true, the logical plan for streaming query will be checked for unsupported" +
            " operations.")
          .doc("When true, the deprecated Consumer based offset fetching used which could cause " +
            "infinite wait in Spark queries. Such cases query restart is the only workaround. " +
            "For further details please see Offset Fetching chapter of Structured Streaming Kafka " +
            "Integration Guide.")
          .doc("When true, the stateful operators for streaming query will be checked for possible " +
            "correctness issue due to global watermark. The correctness issue comes from queries " +
            "containing stateful operation which can emit rows older than the current watermark " +
            "plus allowed late record delay, which are \"late rows\" in downstream stateful " +
            "operations and these rows can be discarded. Please refer the programming guide doc for " +
            "more details. Once the issue is detected, Spark will throw analysis exception. " +
            "When this config is disabled, Spark will just print warning message for users. " +
            "Prior to Spark 3.1.0, the behavior is disabling this config.")
          .doc("If this is enabled, when Spark reads from the results of a streaming query written " +
            "by `FileStreamSink`, Spark will ignore the metadata log and treat it as normal path to " +
            "read, e.g. listing files using HDFS APIs.")
          .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
            "and `${env:var}`.")
          .doc("Enable two-level aggregate hash map. When enabled, records will first be " +
            "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " +
            "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " +
            "When disabled, records go directly to the 2nd level.")
          .doc("Enable two-level aggregate hash map for partial aggregate only, " +
            "because final aggregate might get more distinct keys compared to partial aggregate. " +
            "Overhead of looking up 1st-level map might dominate when having a lot of distinct keys.")
          .doc("Enable vectorized aggregate hash map. This is for testing/benchmarking only.")
          .doc("When true, the code generator would split aggregate code into individual methods " +
            "instead of a single big method. This can be used to avoid oversized function that " +
            "can miss the opportunity of JIT optimization.")
          .doc("When true, enable code-gen for FULL OUTER shuffled hash join.")
          .doc("When true, enable code-gen for FULL OUTER sort merge join.")
          .doc("When true, enable code-gen for Existence sort merge join.")
          .doc("The maximum depth of a view reference in a nested view. A nested view may reference " +
            "other nested views, the dependencies are organized in a directed acyclic graph (DAG). " +
            "However the DAG depth may become too large and cause unexpected behavior. This " +
            "configuration puts a limit on this: when the depth of a view exceeds this value during " +
            "analysis, we terminate the resolution to avoid potential errors.")
          .checkValue(depth => depth > 0, "The maximum depth of a view reference in a nested view " +
            "must be positive.")
          .doc("When true, the SQL function 'count' is allowed to take no parameters.")
          .doc("When false, CTAS with LOCATION throws an analysis exception if the " +
            "location is not empty.")
          .doc("When true, the SQL function 'count' is allowed to take single 'tblName.*' as parameter")
          .doc("When true, SQL Configs of the current active SparkSession instead of the captured " +
            "ones will be applied during the parsing and analysis phases of the view resolution.")
          .doc("When true, analyzed plan instead of SQL text will be stored when creating " +
            "temporary view")
          .doc("When true, it's allowed to use a input query without explicit alias when creating " +
            "a permanent view.")
          .doc("Policy to calculate the global watermark value when there are multiple watermark " +
            "operators in a streaming query. The default value is 'min' which chooses " +
            "the minimum watermark reported across multiple operators. Other alternative value is " +
            "'max' which chooses the maximum across multiple operators. " +
            "Note: This configuration cannot be changed between query restarts from the same " +
            "checkpoint location.")
            str => Set("min", "max").contains(str),
            "Invalid value for 'spark.sql.streaming.multipleWatermarkPolicy'. " +
              "Valid values are 'min' and 'max'")
          .createWithDefault("min") // must be same as MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
          .doc("In the case of ObjectHashAggregateExec, when the size of the in-memory hash map " +
            "grows too large, we will fall back to sort-based aggregation. This option sets a row " +
            "count threshold for the size of the hash map.")
          // We are trying to be conservative and use a relatively small default count threshold here
          // since the state object of some TypedImperativeAggregate function can be quite large (e.g.
          // percentile_approx).
      val USE_OBJECT_HASH_AGG = buildConf("spark.sql.execution.useObjectHashAggregateExec")
        .doc("Decides if we use ObjectHashAggregateExec")
          .doc("Whether to ignore null fields when generating JSON objects in JSON data source and " +
            "JSON functions such as to_json. " +
            "If false, it generates null for null fields in JSON objects.")
          .doc("Whether to optimize JSON expressions in SQL optimizer. It includes pruning " +
            "unnecessary columns from from_json, simplifying from_json + to_json, to_json + " +
            "named_struct(from_json.col1, from_json.col2, ....).")
          .doc("Whether to optimize CSV expressions in SQL optimizer. It includes pruning " +
            "unnecessary columns from from_csv.")
      val COLLAPSE_PROJECT_ALWAYS_INLINE = buildConf("spark.sql.optimizer.collapseProjectAlwaysInline")
        .doc("Whether to always collapse two adjacent projections and inline expressions even if " +
          "it causes extra duplication.")
      val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
        .doc("Whether to delete the expired log files in file stream sink.")
          .doc("Number of log files after which all the previous files " +
            "are compacted into the next log file.")
          .doc("How long that a file is guaranteed to be visible for all readers.")
          .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
      val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion")
        .doc("Whether to delete the expired log files in file stream source.")
          .doc("Number of log files after which all the previous files " +
            "are compacted into the next log file.")
          .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
          .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
          .doc("When true, force the schema of streaming file source to be nullable (including all " +
            "the fields). Otherwise, the schema might not be compatible with actual data, which " +
            "leads to corruptions.")
          .doc("Number of threads used in the file source completed file cleaner.")
          .doc("Whether file-based streaming sources will infer its own schema")
          .doc("How long to delay polling new data when no data is available")
          .doc("How long to wait in milliseconds for the streaming execution thread to stop when " +
            "calling the streaming query's stop() method. 0 or negative values wait indefinitely.")
          .doc("How long to wait between two progress events when there is no data")
            "Whether streaming micro-batch engine will execute batches without data " +
              "for eager state management for stateful streaming queries.")
          .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
          .doc("The number of progress updates to retain for a streaming query")
          .doc("The class used to write checkpoint files atomically. This class must be a subclass " +
            "of the interface CheckpointFileManager.")
          .doc("Whether to detect a streaming query may pick up an incorrect checkpoint path due " +
            "to SPARK-26824.")
          .doc("When true, SQL commands use parallel file listing, " +
            "as opposed to single thread listing. " +
            "This usually speeds up commands that need to list many directories.")
      val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes")
        .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
          s"which is larger than `${AUTO_BROADCASTJOIN_THRESHOLD.key}` to be more conservative. " +
          "That is to say by default the optimizer will not choose to broadcast a table unless it " +
          "knows for sure its size is small enough.")
      val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs")
        .doc("When true, it will fall back to HDFS if the table statistics are not available from " +
          "table metadata. This is useful in determining if a table is small enough to use " +
          "broadcast joins. This flag is effective only for non-partitioned Hive tables. " +
          "For non-partitioned data source tables, it will be automatically recalculated if table " +
          "statistics are not available. For partitioned data source and partitioned Hive tables, " +
          s"It is '${DEFAULT_SIZE_IN_BYTES.key}' if table statistics are not available.")
      val NDV_MAX_ERROR =
          .doc("The maximum relative standard deviation allowed in HyperLogLog++ algorithm " +
            "when generating column level statistics.")
          .doc("Generates histograms when computing column statistics if enabled. Histograms can " +
            "provide better estimation accuracy. Currently, Spark only supports equi-height " +
            "histogram. Note that collecting histograms takes extra cost. For example, collecting " +
            "column statistics usually takes only one table scan, but generating equi-height " +
            "histogram will cause an extra table scan.")
          .doc("The number of bins when generating histograms.")
          .checkValue(num => num > 1, "The number of bins must be greater than 1.")
          .doc("Accuracy of percentile approximation when generating equi-height histograms. " +
            "Larger value means better accuracy. The relative error can be deduced by " +
            "1.0 / PERCENTILE_ACCURACY.")
          .doc("Enables automatic update for table size once table's data is changed. Note that if " +
            "the total number of files of the table is very large, this can be expensive and slow " +
            "down data change commands.")
      val CBO_ENABLED =
          .doc("Enables CBO for estimation of plan statistics when set true.")
          .doc("When true, the logical plan will fetch row counts and column statistics from catalog.")
          .doc("Enables join reorder in CBO.")
          .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
          .checkValue(number => number > 0, "The maximum number must be a positive integer.")
          .doc("The weight of the ratio of cardinalities (number of rows) " +
            "in the cost comparison function. The ratio of sizes in bytes has weight " +
            "1 - this value. The weighted geometric mean of these ratios is used to decide " +
            "which of the candidate plans will be chosen by the CBO.")
          .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
          .doc("Applies star-join filter heuristics to cost based join enumeration.")
      val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
        .doc("When true, it enables join reordering based on star schema detection. ")
      val STARSCHEMA_FACT_TABLE_RATIO = buildConf("spark.sql.cbo.starJoinFTRatio")
        .doc("Specifies the upper limit of the ratio between the largest fact tables" +
          " for a star join to be considered. ")
      private def isValidTimezone(zone: String): Boolean = {
        Try { DateTimeUtils.getZoneId(zone) }.isSuccess
      val SESSION_LOCAL_TIMEZONE = buildConf("spark.sql.session.timeZone")
        .doc("The ID of session local timezone in the format of either region-based zone IDs or " +
          "zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. " +
          "Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08', " +
          "'+01:00' or '-13:33:33'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'. Other " +
          "short names are not recommended to use because they can be ambiguous.")
        .checkValue(isValidTimezone, s"Cannot resolve the given timezone with" +
          " ZoneId.of(_, ZoneId.SHORT_IDS)")
        .createWithDefaultFunction(() => TimeZone.getDefault.getID)
          .doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
          .doc("Threshold for number of rows to be spilled by window operator")
          .doc("Threshold for number of windows guaranteed to be held in memory by the " +
            "session window operator. Note that the buffer is used only for the query Spark " +
            "cannot apply aggregations on determining session window.")
          .doc("Threshold for number of rows to be spilled by window operator. Note that " +
            "the buffer is used only for the query Spark cannot apply aggregations on determining " +
            "session window.")
          .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
            "join operator")
          .doc("Threshold for number of rows to be spilled by sort merge join operator")
          .doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
            "product operator")
          .doc("Threshold for number of rows to be spilled by cartesian product operator")
      val SUPPORT_QUOTED_REGEX_COLUMN_NAME = buildConf("spark.sql.parser.quotedRegexColumnNames")
        .doc("When true, quoted Identifiers (using backticks) in SELECT statement are interpreted" +
          " as regular expressions.")
          .doc("Number of points to sample per partition in order to determine the range boundaries" +
              " for range partitioning, typically used in global sorting (without limit).")
          .doc("(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.enabled'.)")
          .doc("When true, make use of Apache Arrow for columnar data transfers in PySpark. " +
            "This optimization applies to: " +
            "1. pyspark.sql.DataFrame.toPandas " +
            "2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame " +
            "The following data types are unsupported: " +
            "ArrayType of TimestampType, and nested StructType.")
          .doc("(Experimental) When true, make use of Apache Arrow's self-destruct and split-blocks " +
            "options for columnar data transfers in PySpark, when converting from Arrow to Pandas. " +
            "This reduces memory usage at the cost of some CPU time. " +
            "This optimization applies to: pyspark.sql.DataFrame.toPandas " +
            "when 'spark.sql.execution.arrow.pyspark.enabled' is set.")
          .doc("When true, it shows the JVM stacktrace in the user-facing PySpark exception " +
            "together with Python stacktrace. By default, it is disabled and hides JVM stacktrace " +
            "and shows a Python-friendly exception only.")
          .doc("When true, make use of Apache Arrow for columnar data transfers in SparkR. " +
            "This optimization applies to: " +
            "1. createDataFrame when its input is an R DataFrame " +
            "2. collect " +
            "3. dapply " +
            "4. gapply " +
            "The following data types are unsupported: " +
            "FloatType, BinaryType, ArrayType, StructType and MapType.")
          .doc("(Deprecated since Spark 3.0, please set " +
          .doc(s"When true, optimizations enabled by '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' will " +
            "fallback automatically to non-optimized implementations if an error occurs.")
          .doc("When using Apache Arrow, limit the maximum number of records that can be written " +
            "to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.")
            s"Same as `${BUFFER_SIZE.key}` but only applies to Pandas UDF executions. If it is not " +
            s"set, the fallback is `${BUFFER_SIZE.key}`. Note that Pandas execution requires more " +
            "than 4 bytes. Lowering this value could make small Pandas UDF batch iterated and " +
            "pipelined; however, it might degrade performance. See SPARK-27870.")
            "When true, the traceback from Python UDFs is simplified. It hides " +
            "the Python worker, (de)serialization, etc from PySpark in tracebacks, and only " +
            "shows the exception messages from UDFs. Note that this works only with CPython 3.7+.")
          .doc("When true, columns will be looked up by name if labeled with a string and fallback " +
            "to use position if not. When false, a grouped map Pandas UDF will assign columns from " +
            "the returned Pandas DataFrame based on position, regardless of column label type. " +
            "This configuration will be deprecated in future releases.")
          .doc("When true, Arrow will perform safe type conversion when converting " +
            "Pandas.Series to Arrow array during serialization. Arrow will raise errors " +
            "when detecting unsafe type conversion like overflow. When false, disabling Arrow's type " +
            "check and do type conversions anyway. This config only works for Arrow 0.11.0+.")
      val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
        .doc("When true, the apply function of the rule verifies whether the right node of the" +
          " except operation is of type Filter or Project followed by Filter. If yes, the rule" +
          " further verifies 1) Excluding the filter operations from the right (as well as the" +
          " left node, if any) on the top, whether both the nodes evaluates to a same result." +
          " 2) The left and right nodes don't contain any SubqueryExpressions. 3) The output" +
          " column names of the left node are distinct. If all the conditions are met, the" +
          " rule will replace the except operation with a Filter by flipping the filter" +
          " condition(s) of the right node.")
          .doc("When true (default), establishing the result type of an arithmetic operation " +
            "happens according to Hive behavior and SQL ANSI 2011 specification, i.e. rounding the " +
            "decimal part of the result if an exact representation is not possible. Otherwise, NULL " +
            "is returned in those cases, as previously.")
          .doc("When integral literal is used in decimal operations, pick a minimum precision " +
            "required by the literal if this config is true, to make the resulting precision and/or " +
            "scale smaller. This can reduce the possibility of precision lose and/or overflow.")
      val SQL_OPTIONS_REDACTION_PATTERN = buildConf("spark.sql.redaction.options.regex")
        .doc("Regex to decide which keys in a Spark SQL command's options map contain sensitive " +
          "information. The values of options whose names that match this regex will be redacted " +
          "in the explain output. This redaction is applied on top of the global redaction " +
          s"configuration defined by ${SECRET_REDACTION_PATTERN.key}.")
          .doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
            "information. When this regex matches a string part, that string part is replaced by a " +
            "dummy value. This is currently used to redact the output of SQL explain commands. " +
            "When this conf is not set, the value from `spark.redaction.string.regex` is used.")
      val CONCAT_BINARY_AS_STRING = buildConf("spark.sql.function.concatBinaryAsString")
        .doc("When this option is set to false and all inputs are binary, `functions.concat` returns " +
          "an output as binary. Otherwise, it returns as a string.")
      val ELT_OUTPUT_AS_STRING = buildConf("spark.sql.function.eltOutputAsString")
        .doc("When this option is set to false and all inputs are binary, `elt` returns " +
          "an output as binary. Otherwise, it returns as a string.")
          .doc("When this option is set to true, partition column values will be validated with " +
            "user-specified schema. If the validation fails, a runtime exception is thrown. " +
            "When this option is set to false, the partition column value will be converted to null " +
            "if it can not be casted to corresponding user-specified schema.")
          .doc("The max number of entries to be stored in queue to wait for late epochs. " +
            "If this parameter is exceeded by the size of the queue, stream will stop with an error.")
          .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
            " buffer the results of a ContinuousDataReader.")
          .doc("The interval at which continuous execution readers will poll to check whether" +
            " the epoch has advanced on the driver.")
      val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
        .doc("A comma-separated list of data source short names or fully qualified data source " +
          "implementation class names for which Data Source V2 code path is disabled. These data " +
          "sources will fallback to Data Source V1 code path.")
      val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
        .doc("A comma-separated list of fully qualified data source register class names for which" +
          " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
            "A comma-separated list of fully qualified data source register class names for which " +
              "MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " +
              "V1 Sources.")
          .doc("Whether to fast fail task execution when writing output to FileFormat datasource. " +
            "If this is enabled, in `FileFormatWriter` we will catch `FileAlreadyExistsException` " +
            "and fast fail output task without further task retry. Only enabling this if you know " +
            "the `FileAlreadyExistsException` of the output task is unrecoverable, i.e., further " +
            "task attempts won't be able to success. If the `FileAlreadyExistsException` might be " +
            "recoverable, you should keep this as disabled and let Spark to retry output tasks. " +
            "This is disabled by default.")
      object PartitionOverwriteMode extends Enumeration {
        val STATIC, DYNAMIC = Value
          .doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
            "static and dynamic. In static mode, Spark deletes all the partitions that match the " +
            "partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
            "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
            "those partitions that have data written into it at runtime. By default we use static " +
            "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
            "affect Hive serde tables, as they are always overwritten with dynamic mode. This can " +
            "also be set as an output option for a data source using key partitionOverwriteMode " +
            "(which takes precedence over this setting), e.g. " +
            "dataframe.write.option(\"partitionOverwriteMode\", \"dynamic\").save(path)."
      object StoreAssignmentPolicy extends Enumeration {
        val ANSI, LEGACY, STRICT = Value
          .doc("When inserting a value into a column with different data type, Spark will perform " +
            "type coercion. Currently, we support 3 policies for the type coercion rules: ANSI, " +
            "legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. " +
            "In practice, the behavior is mostly the same as PostgreSQL. " +
            "It disallows certain unreasonable type conversions such as converting " +
            "`string` to `int` or `double` to `boolean`. " +
            "With legacy policy, Spark allows the type coercion as long as it is a valid `Cast`, " +
            "which is very loose. e.g. converting `string` to `int` or `double` to `boolean` is " +
            "allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. " +
            "With strict policy, Spark doesn't allow any possible precision loss or data truncation " +
            "in type coercion, e.g. converting `double` to `int` or `decimal` to `double` is " +
            "not allowed."
      val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled")
        .doc("When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. " +
          "For example, Spark will throw an exception at runtime instead of returning null results " +
          "when the inputs to a SQL operator/function are invalid." +
          "For full details of this dialect, you can find them in the section \"ANSI Compliance\" of " +
          "Spark's documentation. Some ANSI dialect features may be not from the ANSI SQL " +
          "standard directly, but their behaviors align with ANSI SQL's style")
      val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords")
        .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " +
          "reserved keywords and forbids SQL queries that use reserved keywords as alias names " +
          "and/or identifiers for table, view, function, etc.")
          .doc("When true, the data type conversions between datetime types and numeric types are " +
            "allowed in ANSI SQL mode. This configuration is only effective when " +
            s"'${ANSI_ENABLED.key}' is true.")
          .doc("When perform a repartition following a shuffle, the output row ordering would be " +
            "nondeterministic. If some downstream stages fail and some tasks of the repartition " +
            "stage retry, these tasks may generate different data, and that can lead to correctness " +
            "issues. Turn on this config to insert a local sort before actually doing repartition " +
            "to generate consistent repartition results. The performance of repartition() may go " +
            "down since we insert extra local sort before it.")
          .doc("Prune nested fields from a logical relation's output which are unnecessary in " +
            "satisfying a query. This optimization allows columnar file format readers to avoid " +
            "reading unnecessary nested column data. Currently Parquet and ORC are the " +
            "data sources that implement this optimization.")
      val DISABLE_HINTS =
          .doc("When true, the optimizer will disable user-specified hints that are additional " +
            "directives for better planning of a query.")
          .doc("A comma-separated list of data source short names or fully qualified data source " +
            "implementation class names for which Spark tries to push down predicates for nested " +
            "columns and/or names containing `dots` to data sources. This configuration is only " +
            "effective with file-based data sources in DSv1. Currently, Parquet and ORC implement " +
            "both optimizations. The other data sources don't support this feature yet. So the " +
            "default value is 'parquet,orc'.")
          .doc("Prune nested fields from object serialization operator which are unnecessary in " +
            "satisfying a query. This optimization allows object serializers to avoid " +
            "executing unnecessary nested expressions.")
          .doc("Prune nested fields from expressions in an operator which are unnecessary in " +
            "satisfying a query. Note that this optimization doesn't prune nested fields from " +
            "physical data source scanning. For pruning nested fields from scanning, please use " +
            "`spark.sql.optimizer.nestedSchemaPruning.enabled` config.")
          .doc("Decorrelate inner query by eliminating correlated references and build domain joins.")
          .doc("When true, the optimizer will inline subqueries with OneRowRelation as leaf nodes.")
          .doc("In SQL queries with a SORT followed by a LIMIT like " +
              "'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" +
              " in memory, otherwise do a global sort which spills to disk if necessary.")
      object Deprecated {
        val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
      object Replaced {
        val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
      val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled")
        .doc("If it is set to true, column names of the requested schema are passed to CSV parser. " +
          "Other column values can be ignored during parsing even if they are malformed.")
      val CSV_INPUT_BUFFER_SIZE = buildConf("spark.sql.csv.parser.inputBufferSize")
        .doc("If it is set, it configures the buffer size of CSV input during parsing. " +
          "It is the same as inputBufferSize option in CSV which has a higher priority. " +
          "Note that this is a workaround for the parsing library's regression, and this " +
          "configuration is internal and supposed to be removed in the near future.")
      val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled")
        .doc("Enables eager evaluation or not. When true, the top K rows of Dataset will be " +
          "displayed if and only if the REPL supports the eager evaluation. Currently, the " +
          "eager evaluation is supported in PySpark and SparkR. In PySpark, for the notebooks like " +
          "Jupyter, the HTML table (generated by _repr_html_) will be returned. For plain Python " +
          "REPL, the returned outputs are formatted like dataframe.show(). In SparkR, the returned " +
          "outputs are showed similar to R data.frame would.")
      val REPL_EAGER_EVAL_MAX_NUM_ROWS = buildConf("spark.sql.repl.eagerEval.maxNumRows")
        .doc("The max number of rows that are returned by eager evaluation. This only takes " +
          s"effect when ${REPL_EAGER_EVAL_ENABLED.key} is set to true. The valid range of this " +
          "config is from 0 to (Int.MaxValue - 1), so the invalid config like negative and " +
          "greater than (Int.MaxValue - 1) will be normalized to 0 and (Int.MaxValue - 1).")
      val REPL_EAGER_EVAL_TRUNCATE = buildConf("spark.sql.repl.eagerEval.truncate")
        .doc("The max number of characters for each cell that is returned by eager evaluation. " +
          s"This only takes effect when ${REPL_EAGER_EVAL_ENABLED.key} is set to true.")
          .doc("Capacity for the max number of rows to be held in memory " +
            "by the fast hash aggregate product operator. The bit is not for actual value, " +
            "but the actual numBuckets is determined by loadFactor " +
            "(e.g: default bit value 16 , the actual numBuckets is ((1 << 16) / 0.5).")
          .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].")
      val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
        .doc("Compression codec used in writing of AVRO files. Supported codecs: " +
          "uncompressed, deflate, snappy, bzip2, xz and zstandard. Default codec is snappy.")
        .checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz", "zstandard"))
      val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
        .doc("Compression level for the deflate codec used in writing of AVRO files. " +
          "Valid value must be in the range of from 1 to 9 inclusive or -1. " +
          "The default value is -1 which corresponds to 6 level in the current implementation.")
        .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
      val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
        .doc(s"If it is set to false, or ${ANSI_ENABLED.key} is true, then size of null returns " +
          "null. Otherwise, it returns -1, which was inherited from Hive.")
          .doc("If it is set to true, `PARTITION(col=null)` is parsed as a string literal of its " +
            "text representation, e.g., string 'null', when the partition column is string type. " +
            "Otherwise, it is always parsed as a null literal in the partition spec.")
          .doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " +
            "to the built-in but external Avro data source module for backward compatibility.")
          .doc("When set to true and the order of evaluation is not specified by parentheses, the " +
            "set operations are performed from left to right as they appear in the query. When set " +
            "to false and order of evaluation is not specified by parentheses, INTERSECT operations " +
            "are performed before any UNION, EXCEPT and MINUS operations.")
          .doc("When set to true, a literal with an exponent (e.g. 1E-30) would be parsed " +
            "as Decimal rather than Double.")
          .doc("When set to true, negative scale of Decimal type is allowed. For example, " +
            "the type of number 1E10BD under legacy mode is DecimalType(2, -9), but is " +
            "Decimal(11, 0) in non legacy mode.")
          .doc("When true, the bucketed table scan will list files during planning to figure out the " +
            "output ordering, which is expensive and may make the planning quite slow.")
          .doc("If it is set to true, the parser will treat HAVING without GROUP BY as a normal " +
            "WHERE, which does not follow SQL standard.")
          .doc("When set to true, the parser of JSON data source treats empty strings as null for " +
            "some data types such as `IntegerType`.")
          .doc("When set to true, Spark returns an empty collection with `StringType` as element " +
            "type if the `array`/`map` function is called without any parameters. Otherwise, Spark " +
            "returns an empty collection with `NullType` as element type.")
          .doc("When set to true, user is allowed to use org.apache.spark.sql.functions." +
            "udf(f: AnyRef, dataType: DataType). Otherwise, an exception will be thrown at runtime.")
          .doc("When set to true, statistical aggregate function returns Double.NaN " +
            "if divide by zero occurred during expression evaluation, otherwise, it returns null. " +
            "Before version 3.1.0, it returns NaN in divideByZero case by default.")
          .doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
            "permission and ACLs when re-creating the table/partition paths.")
          .doc("When set to true, the key attribute resulted from running `Dataset.groupByKey` " +
            "for non-struct key type, will be named as `value`, following the behavior of Spark " +
            "version 2.4 and earlier.")
      val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
        .doc("Maximum number of fields of sequence-like entries can be converted to strings " +
          "in debug output. Any elements beyond the limit will be dropped and replaced by a" +
          """ "... N more fields" placeholder.""")
      val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
        .doc("Maximum number of characters to output for a plan string.  If the plan is " +
          "longer, further output will be truncated.  The default setting always generates a full " +
          "plan.  Set this to a lower value such as 8k if plan strings are taking up too much " +
          "memory or are causing OutOfMemory errors in the driver or UI processes.")
        .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " +
          "value for 'spark.sql.maxPlanStringLength'.  Length must be a valid string length " +
          "(nonnegative and shorter than the maximum size).")
      val MAX_METADATA_STRING_LENGTH = buildConf("spark.sql.maxMetadataStringLength")
        .doc("Maximum number of characters to output for a metadata string. e.g. " +
          "file location in `DataSourceScanExec`, every value will be abbreviated if exceed length.")
        .checkValue(_ > 3, "This value must be bigger than 3.")
          .doc("If it is set to true, SET command will fail when the key is registered as " +
            "a SparkConf entry.")
      object TimestampTypes extends Enumeration {
      val TIMESTAMP_TYPE =
          .doc("Configures the default timestamp type of Spark SQL, including SQL DDL, Cast clause " +
            s"and type literal. Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
            "use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it as " +
            s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME ZONE. " +
            "Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " +
            "LOCAL TIME ZONE type.")
      val DATETIME_JAVA8API_ENABLED = buildConf("spark.sql.datetime.java8API.enabled")
        .doc("If the configuration property is set to true, java.time.Instant and " +
          "java.time.LocalDate classes of Java 8 API are used as external types for " +
          "Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp " +
          "and java.sql.Date are used for the same purpose.")
      val UI_EXPLAIN_MODE = buildConf("spark.sql.ui.explainMode")
        .doc("Configures the query explain mode used in the Spark SQL UI. The value can be 'simple', " +
          "'extended', 'codegen', 'cost', or 'formatted'. The default value is 'formatted'.")
        .checkValue(mode => Set("SIMPLE", "EXTENDED", "CODEGEN", "COST", "FORMATTED").contains(mode),
          "Invalid value for 'spark.sql.ui.explainMode'. Valid values are 'simple', 'extended', " +
          "'codegen', 'cost' and 'formatted'.")
      val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
        .doc("The max length of a file that can be read by the binary file data source. " +
          "Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
          "The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
          .doc("If it is set to true, date/timestamp will cast to string in binary comparisons " +
            s"with String when ${ANSI_ENABLED.key} is false.")
      val DEFAULT_CATALOG = buildConf("spark.sql.defaultCatalog")
        .doc("Name of the default catalog. This will be the current catalog if users have not " +
          "explicitly set the current catalog yet.")
          .doc("A catalog implementation that will be used as the v2 interface to Spark's built-in " +
            s"v1 catalog: $SESSION_CATALOG_NAME. This catalog shares its identifier namespace with " +
            s"the $SESSION_CATALOG_NAME and must be consistent with it; for example, if a table can " +
            s"be loaded by the $SESSION_CATALOG_NAME, this catalog must also return the table " +
            s"metadata. To delegate operations to the $SESSION_CATALOG_NAME, implementations can " +
            "extend 'CatalogExtension'.")
      object MapKeyDedupPolicy extends Enumeration {
        val EXCEPTION, LAST_WIN = Value
      val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy")
        .doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " +
          "MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " +
          "fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " +
          "at last takes precedence.")
      val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast")
        .doc("When true, the upcast will be loose and allows string to atomic types.")
      object LegacyBehaviorPolicy extends Enumeration {
      val LEGACY_CTE_PRECEDENCE_POLICY = buildConf("spark.sql.legacy.ctePrecedencePolicy")
        .doc("When LEGACY, outer CTE definitions takes precedence over inner definitions. If set to " +
          "CORRECTED, inner CTE definitions take precedence. The default value is EXCEPTION, " +
          "AnalysisException is thrown while name conflict is detected in nested CTE. This config " +
          "will be removed in future versions and CORRECTED will be the only behavior.")
      val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
        .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +
          "dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. " +
          "When set to CORRECTED, classes from java.time.* packages are used for the same purpose. " +
          "The default value is EXCEPTION, RuntimeException is thrown when we will get different " +
          .doc("When true, the ArrayExists will follow the three-valued boolean logic.")
          .doc("A comma-delimited string config of the optional additional remote Maven mirror " +
            "repositories. This is only used for downloading Hive jars in IsolatedClientLoader " +
            "if the default Maven Central repo is unreachable.")
          .doc("When true, the `from` bound is not taken into account in conversion of " +
            "a day-time string to an interval, and the `to` bound is used to skip " +
            "all interval units out of the specified range. If it is set to `false`, " +
            "`ParseException` is thrown if the input does not match to the pattern " +
            "defined by `from` and `to`.")
          .doc("When true, all database and table properties are not reserved and available for " +
            "create/alter syntaxes. But please be aware that the reserved properties will be " +
            "silently removed.")
          .doc("When true, only a single file can be added using ADD FILE. If false, then users " +
            "can add directory by passing directory path to ADD FILE.")
          .doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
      val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled")
        .doc("When true, enable filter pushdown to CSV datasource.")
      val JSON_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.json.filterPushdown.enabled")
        .doc("When true, enable filter pushdown to JSON datasource.")
      val AVRO_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.avro.filterPushdown.enabled")
        .doc("When true, enable filter pushdown to Avro datasource.")
          .doc("The number of partitions to be handled in one turn when use " +
            "`AlterTableAddPartitionCommand` or `RepairTableCommand` to add partitions into table. " +
            "The smaller batch size is, the less memory is required for the real handler, e.g. " +
            "Hive Metastore.")
          .checkValue(_ > 0, "The value of spark.sql.addPartitionInBatch.size must be positive")
      val LEGACY_ALLOW_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.allowHashOnMapType")
        .doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " +
          "an analysis exception will be thrown.")
          .doc("When true, grouping_id() returns int values instead of long values.")
          .doc("When LEGACY, Spark will rebase INT96 timestamps from Proleptic Gregorian calendar to " +
            "the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " +
            "When CORRECTED, Spark will not do rebase and write the timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the writing if it sees ancient " +
            "timestamps that are ambiguous between the two calendars.")
          .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
            "to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " +
            "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
            "ancient dates/timestamps that are ambiguous between the two calendars. " +
            "This config influences on writes of the following parquet logical types: DATE, " +
            "TIMESTAMP_MILLIS, TIMESTAMP_MICROS. The INT96 type has the separate config: " +
          .doc("When LEGACY, Spark will rebase INT96 timestamps from the legacy hybrid (Julian + " +
            "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " +
            "When CORRECTED, Spark will not do rebase and read the timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the reading if it sees ancient " +
            "timestamps that are ambiguous between the two calendars. This config is only effective " +
            "if the writer info (like Spark, Hive) of the Parquet files is unknown.")
          .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
            "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " +
            "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
            "ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
            "only effective if the writer info (like Spark, Hive) of the Parquet files is unknown. " +
            "This config influences on reads of the following parquet logical types: DATE, " +
            "TIMESTAMP_MILLIS, TIMESTAMP_MICROS. The INT96 type has the separate config: " +
          .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
            "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " +
            "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
            "ancient dates/timestamps that are ambiguous between the two calendars.")
          .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
            "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " +
            "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
            "When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
            "ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
            "only effective if the writer info (like Spark, Hive) of the Avro files is unknown.")
          .doc("Timeout for executor to wait for the termination of transformation script when EOF.")
          .checkValue(_ > 0, "The timeout value must be positive")
          .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
            "the side with a bigger number of buckets will be coalesced to have the same number " +
            "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
            "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
            "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
            "in join, but it also reduces parallelism and could possibly cause OOM for " +
            "shuffled hash join.")
          .doc("The ratio of the number of two buckets being coalesced should be less than or " +
            "equal to this value for bucket coalescing to be applied. This configuration only " +
            s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
          .checkValue(_ > 0, "The difference must be positive.")
          .doc("The maximum number of partitionings that a HashPartitioning can be expanded to. " +
            "This configuration is applicable only for BroadcastHashJoin inner joins and can be " +
            "set to '0' to disable this feature.")
          .checkValue(_ >= 0, "The value must be non-negative.")
          .doc("When true, NULL-aware anti join execution will be planed into " +
            "BroadcastHashJoinExec with flag isNullAwareAntiJoin enabled, " +
            "optimized from O(M*N) calculation into O(M) calculation " +
            "using Hash lookup instead of Looping lookup." +
            "Only support for singleColumn NAAJ for now.")
          .doc("When true, maps and structs are wrapped by [] in casting to strings, and " +
            "NULL elements of structs/maps/arrays will be omitted while converting to strings. " +
            "Otherwise, if this is false, which is the default, maps and structs are wrapped by {}, " +
            "and NULL elements will be converted to \"null\".")
          .doc("When true, \"path\" option is overwritten if one path parameter is passed to " +
            "DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or " +
            "DataStreamWriter.start(). Also, \"path\" option is added to the overall paths if " +
            "multiple path parameters are passed to DataFrameReader.load()")
          .doc("When true, the extra options will be ignored for DataFrameReader.table(). If set it " +
            "to false, which is the default, Spark will check if the extra options have the same " +
            "key, but the value is different with the table serde properties. If the check passes, " +
            "the extra options will be merged with the serde properties as the scan options. " +
            "Otherwise, an exception will be thrown.")
          .doc("When set to true, CREATE TABLE syntax without USING or STORED AS will use Hive " +
            s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key} as the table provider.")
          .doc("When true, Spark treats CHAR/VARCHAR type the same as STRING type, which is the " +
            "behavior of Spark 3.0 and earlier. This means no length check for CHAR/VARCHAR type and " +
            "no padding for CHAR type when writing data to the table.")
      val CHAR_AS_VARCHAR = buildConf("spark.sql.charAsVarchar")
        .doc("When true, Spark replaces CHAR type with VARCHAR type in CREATE/REPLACE/ALTER TABLE " +
          "commands, so that newly created/updated tables will not have CHAR type columns/fields. " +
          "Existing tables with CHAR type columns/fields are not affected by this config.")
      val CLI_PRINT_HEADER =
         .doc("When set to true, spark-sql CLI prints the names of the columns in query output.")
          .doc("When true, Spark will keep the output schema of commands such as SHOW DATABASES " +
      val LEGACY_INTERVAL_ENABLED = buildConf("spark.sql.legacy.interval.enabled")
        .doc("When set to true, Spark SQL uses the mixed legacy interval type `CalendarIntervalType` " +
          "instead of the ANSI compliant interval types `YearMonthIntervalType` and " +
          "`DayTimeIntervalType`. For instance, the date subtraction expression returns " +
          "`CalendarIntervalType` when the SQL config is set to `true` otherwise an ANSI interval.")
      val MAX_CONCURRENT_OUTPUT_FILE_WRITERS = buildConf("spark.sql.maxConcurrentOutputFileWriters")
        .doc("Maximum number of output file writers to use concurrently. If number of writers " +
          "needed reaches this limit, task will sort rest of output then writing them.")
      val INFER_NESTED_DICT_AS_STRUCT = buildConf("spark.sql.pyspark.inferNestedDictAsStruct.enabled")
        .doc("PySpark's SparkSession.createDataFrame infers the nested dict as a map by default. " +
          "When it set to true, it infers the nested dict as a struct.")
          .doc("When true, Spark will use legacy V1 SQL commands.")
       * Holds information about keys that have been deprecated.
       * @param key The deprecated key.
       * @param version Version of Spark where key was deprecated.
       * @param comment Additional info regarding to the removed config. For example,
       *                reasons of config deprecation, what users should use instead of it.
      case class DeprecatedConfig(key: String, version: String, comment: String)
       * Maps deprecated SQL config keys to information about the deprecation.
       * The extra information is logged as a warning when the SQL config is present
       * in the user's configuration.
      val deprecatedSQLConfigs: Map[String, DeprecatedConfig] = {
        val configs = Seq(
            "The config allows to switch to the behaviour before Spark 2.4 " +
              "and will be removed in the future releases."),
          DeprecatedConfig(HIVE_VERIFY_PARTITION_PATH.key, "3.0",
            s"This config is replaced by '${SPARK_IGNORE_MISSING_FILES.key}'."),
          DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0",
            s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."),
          DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
            s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
          DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
            s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it."),
          DeprecatedConfig(OPTIMIZER_METADATA_ONLY.key, "3.0",
            "Avoid to depend on this optimization to prevent a potential correctness issue. " +
              "If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule."),
          DeprecatedConfig(CONVERT_CTAS.key, "3.1",
            s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead."),
          DeprecatedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2",
            s"Use '${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}' instead."),
          DeprecatedConfig(PARQUET_INT96_REBASE_MODE_IN_WRITE.alternatives.head, "3.2",
            s"Use '${PARQUET_INT96_REBASE_MODE_IN_WRITE.key}' instead."),
          DeprecatedConfig(PARQUET_INT96_REBASE_MODE_IN_READ.alternatives.head, "3.2",
            s"Use '${PARQUET_INT96_REBASE_MODE_IN_READ.key}' instead."),
          DeprecatedConfig(PARQUET_REBASE_MODE_IN_WRITE.alternatives.head, "3.2",
            s"Use '${PARQUET_REBASE_MODE_IN_WRITE.key}' instead."),
          DeprecatedConfig(PARQUET_REBASE_MODE_IN_READ.alternatives.head, "3.2",
            s"Use '${PARQUET_REBASE_MODE_IN_READ.key}' instead."),
          DeprecatedConfig(AVRO_REBASE_MODE_IN_WRITE.alternatives.head, "3.2",
            s"Use '${AVRO_REBASE_MODE_IN_WRITE.key}' instead."),
          DeprecatedConfig(AVRO_REBASE_MODE_IN_READ.alternatives.head, "3.2",
            s"Use '${AVRO_REBASE_MODE_IN_READ.key}' instead."),
          DeprecatedConfig(LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key, "3.2",
            """Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader` instead."""),
          DeprecatedConfig(COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "3.2",
            s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead.")
        Map(configs.map { cfg => cfg.key -> cfg } : _*)
       * Holds information about keys that have been removed.
       * @param key The removed config key.
       * @param version Version of Spark where key was removed.
       * @param defaultValue The default config value. It can be used to notice
       *                     users that they set non-default value to an already removed config.
       * @param comment Additional info regarding to the removed config.
      case class RemovedConfig(key: String, version: String, defaultValue: String, comment: String)
       * The map contains info about removed SQL configs. Keys are SQL config names,
       * map values contain extra information like the version in which the config was removed,
       * config's default value and a comment.
       * Please, add a removed SQL configuration property here only when it affects behaviours.
       * For example, `spark.sql.variable.substitute.depth` was not added as it virtually
       * became no-op later. By this, it makes migrations to new Spark versions painless.
      val removedSQLConfigs: Map[String, RemovedConfig] = {
        val configs = Seq(
          RemovedConfig("spark.sql.fromJsonForceNullableSchema", "3.0.0", "true",
            "It was removed to prevent errors like SPARK-23173 for non-default value."),
            "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "3.0.0", "false",
            "It was removed to prevent loss of user data for non-default value."),
          RemovedConfig("spark.sql.legacy.compareDateTimestampInTimestamp", "3.0.0", "true",
            "It was removed to prevent errors like SPARK-23549 for non-default value."),
          RemovedConfig("spark.sql.parquet.int64AsTimestampMillis", "3.0.0", "false",
            "The config was deprecated since Spark 2.3." +
            s"Use '${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}' instead of it."),
          RemovedConfig("spark.sql.execution.pandas.respectSessionTimeZone", "3.0.0", "true",
            "The non-default behavior is considered as a bug, see SPARK-22395. " +
            "The config was deprecated since Spark 2.3."),
          RemovedConfig("spark.sql.optimizer.planChangeLog.level", "3.1.0", "trace",
            s"Please use `${PLAN_CHANGE_LOG_LEVEL.key}` instead."),
          RemovedConfig("spark.sql.optimizer.planChangeLog.rules", "3.1.0", "",
            s"Please use `${PLAN_CHANGE_LOG_RULES.key}` instead."),
          RemovedConfig("spark.sql.optimizer.planChangeLog.batches", "3.1.0", "",
            s"Please use `${PLAN_CHANGE_LOG_BATCHES.key}` instead.")
        Map(configs.map { cfg => cfg.key -> cfg } : _*)
     * A class that enables the setting and getting of mutable config parameters/hints.
     * In the presence of a SQLContext, these can be set and queried by passing SET commands
     * into Spark SQL's query functions (i.e. sql()). Otherwise, users of this class can
     * modify the hints by programmatically calling the setters and getters of this class.
     * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
    class SQLConf extends Serializable with Logging {
      import SQLConf._
      /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
      @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
        new java.util.HashMap[String, String]())
      @transient protected val reader = new ConfigReader(settings)
      /** ************************ Spark SQL Params/Hints ******************* */
      def analyzerMaxIterations: Int = getConf(ANALYZER_MAX_ITERATIONS)
      def optimizerExcludedRules: Option[String] = getConf(OPTIMIZER_EXCLUDED_RULES)
      def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
      def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
      def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD)
      def planChangeLogLevel: String = getConf(PLAN_CHANGE_LOG_LEVEL)
      def planChangeRules: Option[String] = getConf(PLAN_CHANGE_LOG_RULES)
      def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)
      def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)
      def dynamicPartitionPruningUseStats: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
      def dynamicPartitionPruningFallbackFilterRatio: Double =
      def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
      def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
      def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)
      def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
      def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
      def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
      def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
      def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING)
      def statefulOperatorCorrectnessCheckEnabled: Boolean =
      def fileStreamSinkMetadataIgnored: Boolean = getConf(FILESTREAM_SINK_METADATA_IGNORED)
      def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
      def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
      def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
      def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
      def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
      def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
      def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
      def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
      def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
      def streamingNoDataProgressEventInterval: Long =
      def streamingNoDataMicroBatchesEnabled: Boolean =
      def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
      def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
      def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
      def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
      def filesMinPartitionNum: Option[Int] = getConf(FILES_MIN_PARTITION_NUM)
      def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
      def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
      def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
      def useCompression: Boolean = getConf(COMPRESS_CACHED)
      def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
      def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
      def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE)
      def orcVectorizedReaderNestedColumnEnabled: Boolean =
      def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
      def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
      def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
      def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
      def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
      def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
      def numShufflePartitions: Int = {
        if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
        } else {
      def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
      def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
      def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)
      def nonEmptyPartitionRatioForBroadcastJoin: Double =
      def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)
      def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
      def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
      def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL)
      def stateStoreCompressionCodec: String = getConf(STATE_STORE_COMPRESSION_CODEC)
      def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
      def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
      def parquetFilterPushDownTimestamp: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED)
      def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)
      def parquetFilterPushDownStringStartWith: Boolean =
      def parquetFilterPushDownInFilterThreshold: Int =
      def parquetAggregatePushDown: Boolean = getConf(PARQUET_AGGREGATE_PUSHDOWN_ENABLED)
      def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
      def orcAggregatePushDown: Boolean = getConf(ORC_AGGREGATE_PUSHDOWN_ENABLED)
      def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED)
      def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
      def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
      def metastorePartitionPruningInSetThreshold: Int =
      def metastorePartitionPruningFallbackOnException: Boolean =
      def metastorePartitionPruningFastFallback: Boolean =
      def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
      def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
      def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
      def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
      def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
      def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
      def wholeStageUseIdInClassName: Boolean = getConf(WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME)
      def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
      def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
      def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS)
      def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
      def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
      def methodSplitThreshold: Int = getConf(CODEGEN_METHOD_SPLIT_THRESHOLD)
      def wholeStageSplitConsumeFuncByOperator: Boolean =
      def tableRelationCacheSize: Int =
      def codegenCacheMaxEntries: Int = getConf(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES)
      def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
      def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
      def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
      def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
      def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)
      def fileCompressionFactor: Double = getConf(FILE_COMPRESSION_FACTOR)
      def stringRedactionPattern: Option[Regex] = getConf(SQL_STRING_REDACTION_PATTERN)
      def sortBeforeRepartition: Boolean = getConf(SORT_BEFORE_REPARTITION)
      def topKSortFallbackThreshold: Int = getConf(TOP_K_SORT_FALLBACK_THRESHOLD)
      def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)
      def streamingSessionWindowMergeSessionInLocalPartition: Boolean =
      def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)
      def uiExplainMode: String = getConf(UI_EXPLAIN_MODE)
      def addSingleFileInAddFile: Boolean = getConf(LEGACY_ADD_SINGLE_FILE_IN_ADD_FILE)
      def legacyMsSqlServerNumericMappingEnabled: Boolean =
      def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
      def broadcastHashJoinOutputPartitioningExpandLimit: Int =
       * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
       * identifiers are equal.
      def resolver: Resolver = {
        if (caseSensitiveAnalysis) {
        } else {
       * Returns the error handler for handling hint errors.
      def hintErrorHandler: HintErrorHandler = HintErrorLogger
      def subexpressionEliminationEnabled: Boolean =
      def subexpressionEliminationCacheMaxEntries: Int =
      def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
      def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
      def advancedPartitionPredicatePushdownEnabled: Boolean =
      def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
      def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
      def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
      def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
      def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
      def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
      def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
      def isParquetINT96TimestampConversion: Boolean = getConf(PARQUET_INT96_TIMESTAMP_CONVERSION)
      def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
      def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
      def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED)
      def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
      def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED)
      def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
      def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
      def broadcastTimeout: Long = {
        val timeoutValue = getConf(BROADCAST_TIMEOUT)
        if (timeoutValue < 0) Long.MaxValue else timeoutValue
      def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
      def convertCTAS: Boolean = getConf(CONVERT_CTAS)
      def partitionColumnTypeInferenceEnabled: Boolean =
      def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)
      def parallelPartitionDiscoveryThreshold: Int =
      def parallelPartitionDiscoveryParallelism: Int =
      def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
      def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)
      def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
      def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
      def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
      def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
      def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
      def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
      def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP)
      def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)
      def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)
      def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
      def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
      def hiveThriftServerSingleSession: Boolean =
      def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
      def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
      def groupByAliases: Boolean = getConf(GROUP_BY_ALIASES)
      def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
      def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
      def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)
      def jsonExpressionOptimization: Boolean = getConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION)
      def csvExpressionOptimization: Boolean = getConf(SQLConf.CSV_EXPRESSION_OPTIMIZATION)
      def parallelFileListingInStatsComputation: Boolean =
      def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
      def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)
      def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
      def histogramEnabled: Boolean = getConf(HISTOGRAM_ENABLED)
      def histogramNumBins: Int = getConf(HISTOGRAM_NUM_BINS)
      def percentileAccuracy: Int = getConf(PERCENTILE_ACCURACY)
      def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
      def planStatsEnabled: Boolean = getConf(SQLConf.PLAN_STATS_ENABLED)
      def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)
      def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
      def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
      def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)
      def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
      def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
      def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
      def sessionWindowBufferInMemoryThreshold: Int = getConf(SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD)
      def sessionWindowBufferSpillThreshold: Int = getConf(SESSION_WINDOW_BUFFER_SPILL_THRESHOLD)
      def sortMergeJoinExecBufferInMemoryThreshold: Int =
      def sortMergeJoinExecBufferSpillThreshold: Int =
      def cartesianProductExecBufferInMemoryThreshold: Int =
      def cartesianProductExecBufferSpillThreshold: Int =
      def codegenSplitAggregateFunc: Boolean = getConf(SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC)
      def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)
      def useCurrentSQLConfigsForView: Boolean = getConf(SQLConf.USE_CURRENT_SQL_CONFIGS_FOR_VIEW)
      def storeAnalyzedPlanForView: Boolean = getConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW)
      def allowAutoGeneratedAliasForView: Boolean = getConf(SQLConf.ALLOW_AUTO_GENERATED_ALIAS_FOR_VEW)
      def allowStarWithSingleTableIdentifierInCount: Boolean =
      def allowNonEmptyLocationInCTAS: Boolean =
      def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)
      def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)
      def supportQuotedRegexColumnName: Boolean = getConf(SUPPORT_QUOTED_REGEX_COLUMN_NAME)
      def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)
      def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
      def arrowPySparkSelfDestructEnabled: Boolean = getConf(ARROW_PYSPARK_SELF_DESTRUCT_ENABLED)
      def pysparkJVMStacktraceEnabled: Boolean = getConf(PYSPARK_JVM_STACKTRACE_ENABLED)
      def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
      def arrowPySparkFallbackEnabled: Boolean = getConf(ARROW_PYSPARK_FALLBACK_ENABLED)
      def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
      def pandasUDFBufferSize: Int = getConf(PANDAS_UDF_BUFFER_SIZE)
      def pysparkSimplifiedTraceback: Boolean = getConf(PYSPARK_SIMPLIFIEID_TRACEBACK)
      def pandasGroupedMapAssignColumnsByName: Boolean =
      def arrowSafeTypeConversion: Boolean = getConf(SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION)
      def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
      def decimalOperationsAllowPrecisionLoss: Boolean = getConf(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS)
      def literalPickMinimumPrecision: Boolean = getConf(LITERAL_PICK_MINIMUM_PRECISION)
      def continuousStreamingEpochBacklogQueueSize: Int =
      def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)
      def continuousStreamingExecutorPollIntervalMs: Long =
      def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
      def disabledV2StreamingMicroBatchReaders: String =
      def fastFailFileFormatOutput: Boolean = getConf(FASTFAIL_ON_FILEFORMAT_OUTPUT)
      def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
      def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
      def validatePartitionColumns: Boolean = getConf(VALIDATE_PARTITION_COLUMNS)
      def partitionOverwriteMode: PartitionOverwriteMode.Value =
      def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
      def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
      def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)
      def allowCastBetweenDatetimeAndNumericInAnsi: Boolean =
      def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
        case "TIMESTAMP_LTZ" =>
          // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
        case "TIMESTAMP_NTZ" =>
      def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
      def serializerNestedSchemaPruningEnabled: Boolean =
      def nestedPruningOnExpressions: Boolean = getConf(NESTED_PRUNING_ON_EXPRESSIONS)
      def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
      def legacySizeOfNull: Boolean = {
        // size(null) should return null under ansi mode.
        getConf(SQLConf.LEGACY_SIZE_OF_NULL) && !getConf(ANSI_ENABLED)
      def isReplEagerEvalEnabled: Boolean = getConf(SQLConf.REPL_EAGER_EVAL_ENABLED)
      def replEagerEvalMaxNumRows: Int = getConf(SQLConf.REPL_EAGER_EVAL_MAX_NUM_ROWS)
      def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE)
      def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
      def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
      def replaceDatabricksSparkAvroEnabled: Boolean =
      def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
      def exponentLiteralAsDecimalEnabled: Boolean =
      def allowNegativeScaleOfDecimalEnabled: Boolean =
      def legacyStatisticalAggregate: Boolean = getConf(SQLConf.LEGACY_STATISTICAL_AGGREGATE)
      def truncateTableIgnorePermissionAcl: Boolean =
      def nameNonStructGroupingKeyAsValue: Boolean =
      def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
      def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
      def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
      def setCommandRejectsSparkCoreConfs: Boolean =
      def castDatetimeToString: Boolean = getConf(SQLConf.LEGACY_CAST_DATETIME_TO_STRING)
      def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)
      def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
      def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED)
      def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED)
      def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
      def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
      def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)
      def coalesceBucketsInJoinMaxBucketRatio: Int =
      def optimizeNullAwareAntiJoin: Boolean =
      def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
      def disabledJdbcConnectionProviders: String = getConf(
      def charVarcharAsString: Boolean = getConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING)
      def cliPrintHeader: Boolean = getConf(SQLConf.CLI_PRINT_HEADER)
      def legacyIntervalEnabled: Boolean = getConf(LEGACY_INTERVAL_ENABLED)
      def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED)
      def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
      def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
      def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
      /** ********************** SQLConf functionality methods ************ */
      /** Set Spark SQL configuration properties. */
      def setConf(props: Properties): Unit = settings.synchronized {
        props.asScala.foreach { case (k, v) => setConfString(k, v) }
      /** Set the given Spark SQL configuration property using a `string` value. */
      def setConfString(key: String, value: String): Unit = {
        require(key != null, "key cannot be null")
        require(value != null, s"value cannot be null for key: $key")
        val entry = getConfigEntry(key)
        if (entry != null) {
          // Only verify configs in the SQLConf object
        setConfWithCheck(key, value)
      /** Set the given Spark SQL configuration property. */
      def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
        require(entry != null, "entry cannot be null")
        require(value != null, s"value cannot be null for key: ${entry.key}")
        require(containsConfigEntry(entry), s"$entry is not registered")
        setConfWithCheck(entry.key, entry.stringConverter(value))
      /** Return the value of Spark SQL configuration property for the given key. */
      @throws[NoSuchElementException]("if key is not set")
      def getConfString(key: String): String = {
          orElse {
            // Try to use the default value
            Option(getConfigEntry(key)).map { e => e.stringConverter(e.readFrom(reader)) }
          getOrElse(throw QueryExecutionErrors.noSuchElementExceptionError(key))
       * Return the value of Spark SQL configuration property for the given key. If the key is not set
       * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
       * desired one.
      def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
        require(containsConfigEntry(entry), s"$entry is not registered")
       * Return the value of Spark SQL configuration property for the given key. If the key is not set
       * yet, return `defaultValue` in [[ConfigEntry]].
      def getConf[T](entry: ConfigEntry[T]): T = {
        require(containsConfigEntry(entry), s"$entry is not registered")
       * Return the value of an optional Spark SQL configuration property for the given key. If the key
       * is not set yet, returns None.
      def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
        require(containsConfigEntry(entry), s"$entry is not registered")
       * Return the `string` value of Spark SQL configuration property for the given key. If the key is
       * not set yet, return `defaultValue`.
      def getConfString(key: String, defaultValue: String): String = {
        Option(settings.get(key)).getOrElse {
          // If the key is not set, need to check whether the config entry is registered and is
          // a fallback conf, so that we can check its parent.
          getConfigEntry(key) match {
            case e: FallbackConfigEntry[_] =>
              getConfString(e.fallback.key, defaultValue)
            case e: ConfigEntry[_] if defaultValue != null && defaultValue != ConfigEntry.UNDEFINED =>
              // Only verify configs in the SQLConf object
            case _ =>
      private var definedConfsLoaded = false
       * Init [[StaticSQLConf]] and [[org.apache.spark.sql.hive.HiveUtils]] so that all the defined
       * SQL Configurations will be registered to SQLConf
      private def loadDefinedConfs(): Unit = {
        if (!definedConfsLoaded) {
          definedConfsLoaded = true
          // Force to register static SQL configurations
          try {
            // Force to register SQL configurations from Hive module
            val symbol = ScalaReflection.mirror.staticModule("org.apache.spark.sql.hive.HiveUtils")
          } catch {
            case NonFatal(e) =>
              logWarning("SQL configurations from Hive module is not loaded", e)
       * Return all the configuration properties that have been set (i.e. not the default).
       * This creates a new copy of the config properties in the form of a Map.
      def getAllConfs: immutable.Map[String, String] =
        settings.synchronized { settings.asScala.toMap }
       * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
       * definition contains key, defaultValue and doc.
      def getAllDefinedConfs: Seq[(String, String, String, String)] = {
        getConfigEntries().asScala.filter(_.isPublic).map { entry =>
          val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
          (entry.key, displayValue, entry.doc, entry.version)
       * Redacts the given option map according to the description of SQL_OPTIONS_REDACTION_PATTERN.
      def redactOptions[K, V](options: Map[K, V]): Map[K, V] = {
       * Redacts the given option map according to the description of SQL_OPTIONS_REDACTION_PATTERN.
      def redactOptions[K, V](options: Seq[(K, V)]): Seq[(K, V)] = {
        val regexes = Seq(
        regexes.foldLeft(options) { case (opts, r) => Utils.redact(Some(r), opts) }
       * Return whether a given key is set in this [[SQLConf]].
      def contains(key: String): Boolean = {
       * Logs a warning message if the given config key is deprecated.
      private def logDeprecationWarning(key: String): Unit = {
        SQLConf.deprecatedSQLConfigs.get(key).foreach {
          case DeprecatedConfig(configName, version, comment) =>
              s"The SQL config '$configName' has been deprecated in Spark v$version " +
              s"and may be removed in the future. $comment")
      private def requireDefaultValueOfRemovedConf(key: String, value: String): Unit = {
        SQLConf.removedSQLConfigs.get(key).foreach {
          case RemovedConfig(configName, version, defaultValue, comment) =>
            if (value != defaultValue) {
              throw QueryCompilationErrors.configRemovedInVersionError(configName, version, comment)
      protected def setConfWithCheck(key: String, value: String): Unit = {
        requireDefaultValueOfRemovedConf(key, value)
        settings.put(key, value)
      def unsetConf(key: String): Unit = {
      def unsetConf(entry: ConfigEntry[_]): Unit = {
      def clear(): Unit = {
      override def clone(): SQLConf = {
        val result = new SQLConf
        getAllConfs.foreach {
          case(k, v) => if (v ne null) result.setConfString(k, v)
      // For test only
      def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
        val cloned = clone()
        entries.foreach {
          case (entry, value) => cloned.setConfString(entry.key, value.toString)
      def isModifiable(key: String): Boolean = {
        containsConfigKey(key) && !isStaticConfigKey(key)


