zoukankan      html  css  js  c++  java
  • spark属性

    应用属性

    属性名缺省值意义
    spark.app.name (none) The name of your application. This will appear in the UI and in log data.
    spark.master (none) The cluster manager to connect to. See the list of allowed master URL’s.
    spark.executor.memory 512m Amount of memory to use per executor process, in the same format as JVM memory strings (e.g.512m,2g).
    spark.serializer org.apache.spark.serializer.
    JavaSerializer
    Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommendusingorg.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass oforg.apache.spark.Serializer.
    spark.kryo.registrator (none) If you use Kryo serialization, set this class to register your custom classes with Kryo. It should be set to a class that extendsKryoRegistrator. See thetuning guide for more details.
    spark.local.dir /tmp Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager.
    spark.logConf false Logs the effective SparkConf as INFO when a SparkContext is started.

    除了这些,下面的属性也可用,在某些情况下需要设置:

    运行时环境Runtime Environment

    属性名缺省值意义
    spark.executor.extraJavaOptions (none) A string of extra JVM options to pass to executors. For instance, GC settings or other logging. Note that it is illegal to set Spark properties or heap size settings with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Heap size settings can be set with spark.executor.memory.
    spark.executor.extraClassPath (none) Extra classpath entries to append to the classpath of executors. This exists primarily for backwards-compatibility with older versions of Spark. Users typically should not need to set this option.
    spark.executor.extraLibraryPath (none) Set a special library path to use when launching executor JVM’s.
    spark.files.userClassPathFirst false (Experimental) Whether to give user-added jars precedence over Spark’s own jars when loading classes in Executors. This feature can be used to mitigate conflicts between Spark’s dependencies and user dependencies. It is currently an experimental feature.
    spark.python.worker.memory 512m Amount of memory to use per python worker process during aggregation, in the same format as JVM memory strings (e.g.512m,2g). If the memory used during aggregation goes above this amount, it will spill the data into disks.
    spark.executorEnv.[EnvironmentVariableName] (none) Add the environment variable specified byEnvironmentVariableName to the Executor process. The user can specify multiple of these and to set multiple environment variables.
    spark.mesos.executor.home driver sideSPARK_HOME Set the directory in which Spark is installed on the executors in Mesos. By default, the executors will simply use the driver’s Spark home directory, which may not be visible to them. Note that this is only relevant if a Spark binary package is not specified throughspark.executor.uri.
    spark.mesos.executor.memoryOverhead executor memory * 0.07, with minimum of 384 This value is an additive forspark.executor.memory, specified in MiB, which is used to calculate the total Mesos task memory. A value of384 implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum overhead. The final overhead will be the larger of either spark.mesos.executor.memoryOverhead or 7% of spark.executor.memory.

    Shuffle Behavior

    属性名缺省值意义
    spark.shuffle.consolidateFiles false If set to “true”, consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to “true” when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.
    spark.shuffle.spill true If set to “true”, limits the amount of memory used during reduces by spilling data out to disk. This spilling threshold is specified byspark.shuffle.memoryFraction.
    spark.shuffle.spill.compress true Whether to compress data spilled during shuffles. Compression will usespark.io.compression.codec.
    spark.shuffle.memoryFraction 0.2 Fraction of Java heap to use for aggregation and cogroups during shuffles, ifspark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense ofspark.storage.memoryFraction.
    spark.shuffle.compress true Whether to compress map output files. Generally a good idea. Compression will usespark.io.compression.codec.
    spark.shuffle.file.buffer.kb 32 Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
    spark.reducer.maxMbInFlight 48 Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory.
    spark.shuffle.manager HASH Implementation to use for shuffling data. A hash-based shuffle manager is the default, but starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more memory-efficient in environments with small executors, such as YARN. To use that, change this value toSORT.
    spark.shuffle.sort.bypassMergeThreshold 200 (Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions.

    Spark UI

    属性名缺省值意义
    spark.ui.port 4040 Port for your application’s dashboard, which shows memory and workload data.
    spark.ui.retainedStages 1000 How many stages the Spark UI remembers before garbage collecting.
    spark.ui.killEnabled true Allows stages and corresponding jobs to be killed from the web ui.
    spark.eventLog.enabled false Whether to log Spark events, useful for reconstructing the Web UI after the application has finished.
    spark.eventLog.compress false Whether to compress logged events, ifspark.eventLog.enabled is true.
    spark.eventLog.dir file:///tmp/spark-events Base directory in which Spark events are logged, ifspark.eventLog.enabled is true. Within this base directory, Spark creates a sub-directory for each application, and logs the events specific to the application in this directory. Users may want to set this to a unified location like an HDFS directory so history files can be read by the history server.

    Compression and Serialization

    属性名缺省值意义
    spark.broadcast.compress true Whether to compress broadcast variables before sending them. Generally a good idea.
    spark.rdd.compress false Whether to compress serialized RDD partitions (e.g. forStorageLevel.MEMORY_ONLY_SER). Can save substantial space at the cost of some extra CPU time.
    spark.io.compression.codec snappy The codec used to compress internal data such as RDD partitions and shuffle outputs. By default, Spark provides three codecs:lz4,lzf, andsnappy. You can also use fully qualified class names to specify the codec, e.g.org.apache.spark.io.LZ4CompressionCodec,org.apache.spark.io.LZFCompressionCodec, andorg.apache.spark.io.SnappyCompressionCodec.
    spark.io.compression.snappy.block.size 32768 Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
    spark.io.compression.lz4.block.size 32768 Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
    spark.closure.serializer org.apache.spark.serializer.
    JavaSerializer
    Serializer class to use for closures. Currently only the Java serializer is supported.
    spark.serializer.objectStreamReset 100 When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches objects to prevent writing redundant data, however that stops garbage collection of those objects. By calling ‘reset’ you flush that info from the serializer, and allow old objects to be collected. To turn off this periodic reset set it to -1. By default it will reset the serializer every 100 objects.
    spark.kryo.referenceTracking true Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case.
    spark.kryo.registrationRequired false Whether to require registration with Kryo. If set to ‘true’, Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration.
    spark.kryoserializer.buffer.mb 0.064 Initial size of Kryo’s serialization buffer, in megabytes. Note that there will be one bufferper core on each worker. This buffer will grow up tospark.kryoserializer.buffer.max.mb if needed.
    spark.kryoserializer.buffer.max.mb 64 Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any object you attempt to serialize. Increase this if you get a “buffer limit exceeded” exception inside Kryo.

    Execution Behavior

    属性名缺省值意义
    spark.default.parallelism
    • Local mode: number of cores on the local machine
    • Mesos fine grained mode: 8
    • Others: total number of cores on all executor nodes or 2, whichever is larger
    Default number of tasks to use across the cluster for distributed shuffle operations (groupByKey,reduceByKey, etc) when not set by user.
    spark.broadcast.factory org.apache.spark.broadcast.
    TorrentBroadcastFactory
    Which broadcast implementation to use.
    spark.broadcast.blockSize 4096 Size of each piece of a block in kilobytes forTorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small,BlockManager might take a performance hit.
    spark.files.overwrite false Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
    spark.files.fetchTimeout false Communication timeout to use when fetching files added through SparkContext.addFile() from the driver.
    spark.storage.memoryFraction 0.6 Fraction of Java heap to use for Spark’s memory cache. This should not be larger than the “old” generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase it if you configure your own old generation size.
    spark.storage.unrollFraction 0.2 Fraction ofspark.storage.memoryFraction to use for unrolling blocks in memory. This is dynamically allocated by dropping existing blocks when there is not enough free storage space to unroll the new block in its entirety.
    spark.tachyonStore.baseDir System.getProperty(“java.io.tmpdir”) Directories of the Tachyon File System that store RDDs. The Tachyon file system’s URL is set byspark.tachyonStore.url. It can also be a comma-separated list of multiple directories on Tachyon file system.
    spark.storage.memoryMapThreshold 8192 Size of a block, in bytes, above which Spark memory maps when reading a block from disk. This prevents Spark from memory mapping very small blocks. In general, memory mapping has high overhead for blocks close to or below the page size of the operating system.
    spark.tachyonStore.url tachyon://localhost:19998 The URL of the underlying Tachyon file system in the TachyonStore.
    spark.cleaner.ttl (infinite) Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
    spark.hadoop.validateOutputSpecs true If set to true, validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop’s FileSystem API to delete output directories by hand.
    spark.hadoop.cloneConf false If set to true, clones a new HadoopConfiguration object for each task. This option should be enabled to work aroundConfiguration thread-safety issues (seeSPARK-2546 for more details). This is disabled by default in order to avoid unexpected performance regressions for jobs that are not affected by these issues.
    spark.executor.heartbeatInterval 10000 Interval (milliseconds) between each executor’s heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

    Networking

    属性名缺省值意义
    spark.driver.host (local hostname) Hostname or IP address for the driver to listen on. This is used for communicating with the executors and the standalone Master.
    spark.driver.port (random) Port for the driver to listen on. This is used for communicating with the executors and the standalone Master.
    spark.fileserver.port (random) Port for the driver’s HTTP file server to listen on.
    spark.broadcast.port (random) Port for the driver’s HTTP broadcast server to listen on. This is not relevant for torrent broadcast.
    spark.replClassServer.port (random) Port for the driver’s HTTP class server to listen on. This is only relevant for the Spark shell.
    spark.blockManager.port (random) Port for all block managers to listen on. These exist on both the driver and the executors.
    spark.executor.port (random) Port for the executor to listen on. This is used for communicating with the driver.
    spark.port.maxRetries 16 Default maximum number of retries when binding to a port before giving up.
    spark.akka.frameSize 10 Maximum message size to allow in “control plane” communication (for serialized tasks and task results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. usingcollect() on a large dataset).
    spark.akka.threads 4 Number of actor threads to use for communication. Can be useful to increase on large clusters when the driver has a lot of CPU cores.
    spark.akka.timeout 100 Communication timeout between Spark nodes, in seconds.
    spark.akka.heartbeat.pauses 600 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of spark.akka.heartbeat.interval and spark.akka.failure-detector.threshold if you need to.
    spark.akka.failure-detector.threshold 300.0 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka’s akka.remote.transport-failure-detector.threshold. Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.heartbeat.interval if you need to.
    spark.akka.heartbeat.interval 1000 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka’s failure detector. Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.failure-detector.threshold if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real Spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.

    Scheduling

    属性名缺省值意义
    spark.task.cpus 1 Number of cores to allocate for each task.
    spark.task.maxFailures 4 Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
    spark.scheduler.mode FIFO Thescheduling mode between jobs submitted to the same SparkContext. Can be set toFAIR to use fair sharing instead of queueing jobs one after another. Useful for multi-user services.
    spark.cores.max (not set) When running on astandalone deploy cluster or aMesos cluster in “coarse-grained” sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will bespark.deploy.defaultCores on Spark’s standalone cluster manager, or infinite (all available cores) on Mesos.
    spark.mesos.coarse false If set to “true”, runs over Mesos clusters in“coarse-grained” sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job.
    spark.speculation false If set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
    spark.speculation.interval 100 How often Spark will check for tasks to speculate, in milliseconds.
    spark.speculation.quantile 0.75 Percentage of tasks which must be complete before speculation is enabled for a particular stage.
    spark.speculation.multiplier 1.5 How many times slower a task is than the median to be considered for speculation.
    spark.locality.wait 3000 Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by settingspark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
    spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
    spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
    spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
    spark.scheduler.revive.interval 1000 The interval length for the scheduler to revive the worker resource offers to run tasks (in milliseconds).
    spark.scheduler.minRegisteredResourcesRatio 0 The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by configspark.scheduler.maxRegisteredResourcesWaitingTime.
    spark.scheduler.maxRegisteredResourcesWaitingTime 30000 Maximum amount of time to wait for resources to register before scheduling begins (in milliseconds).
    spark.localExecution.enabled false Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.

    Security

    属性名缺省值意义
    spark.authenticate false Whether Spark authenticates its internal connections. Seespark.authenticate.secret if not running on YARN.
    spark.authenticate.secret None Set the secret key used for Spark to authenticate between components. This needs to be set if not running on YARN and authentication is enabled.
    spark.core.connection.auth.wait.timeout 30 Number of seconds for the connection to wait for authentication to occur before timing out and giving up.
    spark.core.connection.ack.wait.timeout 60 Number of seconds for the connection to wait for ack to occur before timing out and giving up. To avoid unwilling timeout caused by long pause like GC, you can set larger value.
    spark.ui.filters None Comma separated list of filter class names to apply to the Spark web UI. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a java system property of:
    spark.<class name of filter>.params=’param1=value1,param2=value2’
    For example:
    -Dspark.ui.filters=com.test.filter1
    -Dspark.com.test.filter1.params=’param1=foo,param2=testing’
    spark.acls.enable false Whether Spark acls should are enabled. If enabled, this checks to see if the user has access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user.
    spark.ui.view.acls Empty Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access.
    spark.modify.acls Empty Comma separated list of users that have modify access to the Spark job. By default only the user that started the Spark job has access to modify it (kill it for example).
    spark.admin.acls Empty Comma separated list of users/administrators that have view and modify access to all Spark jobs. This can be used if you run on a shared cluster and have a set of administrators or devs who help debug when things work.

    Spark Streaming

    属性名缺省值意义
    spark.streaming.blockInterval 200 Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced into blocks of data before storing them in Spark.
    spark.streaming.receiver.maxRate infinite Maximum rate (per second) at which each receiver will push data into blocks. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate.
    spark.streaming.unpersist true Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from Spark’s memory. The raw input data received by Spark Streaming is also automatically cleared. Setting this to false will allow the raw data and persisted RDDs to be accessible outside the streaming application as they will not be cleared automatically. But it comes at the cost of higher memory usage in Spark.
    spark.executor.logs.rolling.strategy (none) Set the strategy of rolling of executor logs. By default it is disabled. It can be set to “time” (time-based rolling) or “size” (size-based rolling). For “time”, usespark.executor.logs.rolling.time.interval to set the rolling interval. For “size”, usespark.executor.logs.rolling.size.maxBytes to set the maximum file size for rolling.
    spark.executor.logs.rolling.time.interval daily Set the time interval by which the executor logs will be rolled over. Rolling is disabled by default. Valid values are daily, hourly, minutely or any interval in seconds. Seespark.executor.logs.rolling.maxRetainedFiles for automatic cleaning of old logs.
    spark.executor.logs.rolling.size.maxBytes (none) Set the max size of the file by which the executor logs will be rolled over. Rolling is disabled by default. Value is set in terms of bytes. Seespark.executor.logs.rolling.maxRetainedFiles for automatic cleaning of old logs.
    spark.executor.logs.rolling.maxRetainedFiles (none) Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.

    大多数的可配置系统会在内部设置一个比较合理的默认值。但是,至少下面的5个属性你应该自己去设置的:

    属性名称默认值含义
    spark.executor.memory 512m 每个处理器可以使用的内存大小,跟JVM的内存表示的字符串格式是一样的(比如: '512m','2g')
    spark.serializer spark.JavaSerializer 一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。默认情况下Java的序列化机制可以序列化任何实现了Serializable接口的对象,但是速度是很慢的,因此当你在意运行速度的时候我们建议你使用spark.KryoSerializer 并且配置 Kryo serialization。可以是任何 spark.Serializer的子类。
    spark.kryo.registrator (none) 如果你使用的是Kryo序列化,就要为Kryo设置这个类去注册你自定义的类。这个类需要继承spark.KryoRegistrator。 可以参考 调优指南 获取更多的信息。
    spark.local.dir /tmp 设置Spark的暂存目录,包括映射输出文件盒需要存储在磁盘上的RDDs。这个磁盘目录在你的系统上面访问速度越快越好。可以用逗号隔开来设置多个目录。
    spark.cores.max (infinite) 当运行在一个独立部署集群上或者是一个粗粒度共享模式的Mesos集群上的时候,最多可以请求多少个CPU核心。默认是所有的都能用。
    一刀
    一刀
    翻译于 2年前

    0人顶

     翻译的不错哦!

    除了上体的5个外,下面还列举了一些属性,在某些情况下你可能需要自己去配置下。
    属性名默认值含义
    spark.mesos.coarse false 如果设置为了"true",将以粗粒度共享模式运行在Mesos集群上, 这时候Spark会在每台机器上面获得一个长期运行的Mesos任务,而不是对每个Spark任务都要产生一个Mesos任务。对于很多短查询,这个可能会有些许的延迟,但是会大大提高Spark工作时的资源利用率。
    spark.default.parallelism 8 在用户没有指定时,用于分布式随机操作(groupByKey,reduceByKey等等)的默认的任务数。
    spark.storage.memoryFraction 0.66 Spark用于缓存的内存大小所占用的Java堆的比率。这个不应该大于JVM中老年代所分配的内存大小,默认情况下老年代大小是堆大小的2/3,但是你可以通过配置你的老年代的大小,然后再去增加这个比率。
    spark.ui.port (random) 你的应用程序控制面板端口号,控制面板中可以显示每个RDD的内存使用情况。
    spark.shuffle.compress true 是否压缩映射输出文件,通常设置为true是个不错的选择。
    spark.broadcast.compress true 广播变量在发送之前是否先要被压缩,通常设置为true是个不错的选择。
    spark.rdd.compress false 是否要压缩序列化的RDD分区(比如,StorageLevel.MEMORY_ONLY_SER)。在消耗一点额外的CPU时间的代价下,可以极大的提高减少空间的使用。
    spark.reducer.maxMbInFlight 48 同时获取每一个分解任务的时候,映射输出文件的最大的尺寸(以兆为单位)。由于对每个输出都需要我们去创建一个缓冲区去接受它,这个属性值代表了对每个分解任务所使用的内存的一个上限值,因此除非你机器内存很大,最好还是配置一下这个值。
    spark.closure.serializer spark.JavaSerializer 用于闭包的序列化类。通常Java是可以胜任的,除非在你的驱动程序中分布式函数(比如map函数)引用了大量的对象。
    spark.kryoserializer.buffer.mb 32 Kryo中运行的对象的最大尺寸(Kryo库需要创建一个不小于最大的单个序列化对象的缓存区)。如果在Kryo中出现"buffer limit exceeded"异常,你就需要去增加这个值了。注意,对每个worker而言,一个核心就会有一个缓冲。
    spark.broadcast.factory spark.broadcast.HttpBroadcastFactory 使用哪一个广播实现
    spark.locality.wait 3000 在发布一个本地数据任务时候,放弃并发布到一个非本地数据的地方前,需要等待的时间。如果你的很多任务都是长时间运行的任务,并且看到了很多的脏数据的话,你就该增加这个值了。但是一般情况下缺省值就可以很好的工作了。
    spark.worker.timeout 60 如果超过这个时间,独立部署master还没有收到worker的心跳回复,那么就认为这个worker已经丢失了。
    spark.akka.frameSize 10 在控制面板通信(序列化任务和任务结果)的时候消息尺寸的最大值,单位是MB。如果你需要给驱动器发回大尺寸的结果(比如使用在一个大的数据集上面使用collect()方法),那么你就该增加这个值了。
    spark.akka.threads 4 用于通信的actor线程数量。如果驱动器有很多CPU核心,那么在大集群上可以增大这个值。
    spark.akka.timeout 20 Spark节点之间通信的超时时间,以秒为单位
    spark.driver.host (local hostname) 驱动器监听主机名或者IP地址.
    spark.driver.port (random) 驱动器监听端口号
    spark.cleaner.ttl (disable) Spark记忆任何元数据(stages生成,任务生成等等)的时间(秒)。周期性清除保证在这个时间之前的元数据会被遗忘。当长时间几小时,几天的运行Spark的时候设置这个是很有用的。注意:任何内存中的RDD只要过了这个时间就会被清除掉。
    spark.streaming.blockInterval 200 从网络中批量接受对象时的持续时间。

    日志配置

    Spark使用 log4j 作为它的日志实现。 你可以在conf文件夹中增加一个log4j.properties配置文件去配置日志。开始的时候,你可以复制conf文件夹中已经存在一个log4j.properties.template模板,重命名为log4j.properties。

    A:应用程序属性

      属性名称   默认   含义
      spark.app.name   无 应用程序名称
      spark.master   无 要连接的群集管理器
     spark.executor.memory   512 m 每个executor使用的内存总量
      spark.serializer  org.apache.spark.serializer.
    JavaSerializer
    在网络数据传送或缓存时使用的序化 器 ,默认的序 化 器 是Java序化 器 ,虽然这种序 化 器 对任何Java对象可以使用,兼容性 好,但是处理速度相当的慢;如果要追求处理速度的话,建议使用org.apache.spark.serializer.KryoSerializer序 化 器 。当然也可以任意是定义为org.apache.spark.Serializer 子类的 序化器 。
      spark.kryo.registrator   无 如果要使用 Kryo 序化 器 ,需要创建一个继承 KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类。
      spark.local.dir   /tmp 用于暂存 空间的目录,该目录用于保存map输出文件或者 转储 RDD。该目录位于高速的本地磁盘上,或者位于使用逗号分隔的多个不同磁盘上的 目录。注意: 在Spark 1.0 及更高版本这属性将被 群集管理器配置的环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替。
      spark.logConf   false SparkContext 启动时记录有效 SparkConf信息。
       
    B:运行时环境
      属性名称   默认   含义
      spark.executor.memory  512 m 分配给每个executor进程总内存(使用类似512m、2g格式) 
     spark.executor.extraJavaOptions   无 要传递给executor的额外 JVM 选项,注意不能使用它来设置Spark属性或堆大小设置。
      spark.executor.extraClassPath   无 追加到executor类路径中的附加类路径,主要为了兼容旧版本的Spark,通常不需要用户设置。
     spark.executor.extraLibraryPath   无   启动executor JVM 时要用到的特殊库路径。
      spark.files.userClassPathFirst  false executor在加载类的时候是否优先使用用户自定义的JAR包,而不是Spark带有的JAR包。此功能可以用于解决Spark依赖包和用户依赖包之间的冲突。目前,该属性只是一项试验功能。

    C:Shuffle 操作

      属性名称   默认   含义
     spark.shuff le.consolidateFiles  false 如果设置为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说,合并文件可以提高文件系统性能。如果 使用的是 ext4 或 xfs 文件系统, 建议设置为true;对于ext3, 由于文件系统的限制,设置为true 反而 会使内核>8的机器 降低性能。
      spark.shuffle.spill  true 如果设置为true,在 shuffle 期间 通过溢出数据到磁盘来降低 了 内存 使用总 量,溢出阈值是由spark.shuffle.memoryFraction指定的。
      spark.shuffle.spill.compress  true 是否压缩在 shuffle 期间溢出的数据,如果压缩将使用spark.io.compression.codec。
      spark.shuffle.compress  true 是否压缩map输出文件,压缩将使用spark.io.compression.codec。
      spark.shuffle.file.buffer.kb  100 每个shuffle的文件输出流 内存缓冲区 的 大小 ,以KB为单位。这些缓冲区可以减少磁盘寻道的次数,也减少创建shuffle中间文件时的系统调用。
     spark.reducer.maxMbInFlight   48 每个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。由于每个map输出都需要一个缓冲区来接收它,这代表着 每个 reduce 任务有 固定的内存开销,所以要设置小点,除非有很大内存。
       
    D:Spark UI
      属性名称   默认   含义
      spark.ui.port   4040 应用程序webUI的端口
      spark.ui.retainedStages   1000 在GC之前webUI保留的stage数量
      spark.ui.killEnabled   true 允许在 webUI将stage和相应的job杀死
      spark.eventLog.enabled   false 是否记录 Spark事件,用于 应用程序在完成后 重构 webUI。
     spark.eventLog.compress   false 是否压缩记录Spark事件,前提spark.eventLog.enabled为true。
      spark.eventLog.dir  file:///tmp/spark-events 如果spark.eventLog.enabled为 true,该属性为记录spark 事件的根目录。在此根目录中,Spark为 每个应用程序 创建分目录,并将 应用程序的 事件记录到在此目录中。用 户可以将此属性设置为HDFS目录,以便history server读取历史记录文件。
       
    E:压缩和序化
      属性名称   默认   含义
      spark.broadcast.compress   true 是否在发送之前压缩广播变量。
      spark.rdd.compress   false 是否压缩序化的RDD分区 ,可以节省大量空间,但会消耗一些额外的CPU时间。
      spark.io.compression.codec   org.apache.spark.io. 
    LZFCompressionCodec
    用于压缩内部数据如 RDD 分区和shuffle输出的 编码 解码器 。 Spark提供两个编解码器: org.apache.spark.io.LZFCompressionCodec和 org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的压缩和解压缩,而LZF提供了 更好的压缩比。
     spark.io.compression.snappy
    .block.size
      32768 使用 Snappy 编 码 解码器时, 编 码 解码器使用的 块大小 (以字节为单位)  。
      spark. closure .serializer  org.apache.spark.serializer.
    JavaSerializer
    用于闭包 的序化器,目前只有支持Java序化器。
      spark.serializer. 
    objectStreamReset
      10000 使用 org.apache.spark.serializer.JavaSerializer序化时,序化器缓存对象以防止写入冗余数据,这时停止这些对象的 垃圾收集。通过调用重置序化器,刷新该信息就可以收集旧对象。若要关闭这重定期重置功能将其设置为 < = 0  。默认情况下 每10000个对象将 重置序化器。
     spark.kryo.referenceTracking   true 当使用 Kryo 序化数据时, 是否跟踪对同一对象的引用。如果你的对象图有回路或者同一对象有多个副本,有必要设置为true;其他情况下 可以禁用以提高性能。
     spark.kryoserializer.buffer.mb   2 在Kryo 里 允许的最大对象大小 ( Kryo 会创建一个缓冲区,至少和序化的最大单个对象一样大) 。如果Kryo 出现缓冲区限制超出异常报错,增加这个值。注意,每个worker的每个core只有一个缓冲区。
       
    F:执行操作
      属性名称   默认   含义
      spark.default.parallelism 本地模式: 本地机器内核数

    Mesos精细模式: 8

    其他: 所有executor的core总数

    或者2,以较大者为准

    如果用户没设置,系统使用集群中运行shuffle操作的默认任务数(groupByKey、 reduceByKey等)。
      spark.broadcast.factory org.apache.spark.broadcast. 
    HttpBroadcastFactory
    广播的实现类
      spark.broadcast.blockSize   4096 TorrentBroadcastFactory块 大小(以kb为单位)。太大值在广播时降低并行性 (使速度变慢);太小值, BlockManager性能 可能会受到冲击。
      spark.files.overwrite   false 通过 SparkContext.addFile() 添加的文件在目标中已经存在并且内容不匹配时,是否覆盖目标文件。
      spark.files.fetchTimeout   false 在 获取由driver 通过SparkContext.addFile() 添加的文件时, 是否使用 通信时间 超时。
     spark.storage.memoryFraction   0.6 Java堆用于cache的比例
      spark.tachyonStore.baseDir  System.getProperty("java.io.tmpdir") 用于存储RDD的techyon目录,tachyon文件系统的URL由spark.tachyonStore.url设置。也可以是逗号分隔的多个techyon目录。
      spark.storage. 
    memoryMapThreshold
      8192 以字节为单位的块大小,用于磁盘读取一个块大小进行内存映射。这可以防止Spark在内存映射时使用很小块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小。
      spark.tachyonStore.url   tachyon://localhost:19998 基于techyon文件的URL。
      spark.cleaner.ttl   无限 spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据遗忘,这在运行长时间任务是很有用的,如运行24/7的sparkstreaming任务。注意RDD持久化在内存中的超期数据也会被清理。
     
    G:网络通信
      属性名称   默认   含义
      spark.driver.host   本地主机名 运行driver的主机名或 IP 地址。
      spark.driver.port   随机 driver侦听的端口。
      spark.akka.frameSize   10 以MB为单位的driver和executor之间通信信息的大小,设置值越大,driver可以接受更大的计算结果。
      spark.akka.threads   4 用于通信的actor线程数,在大型集群中拥有更多CPU内核的driver可以增加actor线程数。
      spark.akka.timeout   100 以秒为单位的 Spark节点 之间通信超时时间。
     spark.akka.heartbeat.pauses   600 下面3个参数是用于设置akka自带的故障探测器,设置很大值的话,可以停用故障探测器。如果想启用故障探测器,以秒为单位设置这3个参数。通常 是在特殊需要的情况下开启故障探测器,一个敏感的故障探测器有助于恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启 故障探测器;另外故障探测器的开启会导致由于心跳信息的频繁交换而引起的网络泛滥。 
    本参数是设置可接受的心跳停顿时间。
      spark.akka.failure-detector.threshold  300.0 对应AKKA的akka.remote.transport-failure-detector.threshold
     spark.akka.heartbeat.interval   1000 心跳间隔时间
      
    H:调度
      属性名称   默认   含义
      spark.task.cpus   1 为每个任务分配的 内核 数。
      spark.task.maxFailures   4 job放弃task前该task的失败次数,该值>=1
      spark.scheduler.mode  FIFO SparkContext对job进行调度所采用的模式。 
    对于多用户可采用FAIR模式。
      spark.cores.max   未设置 当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每台机器,而是 整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使用集群中可用 的内核。
      spark.mesos.coarse   false 如果设置为true,在Mesos集群中运行时使用粗粒度共享模式。
      spark.speculation   false 以下几个参数是关于Spark推测执行机制的相关参数 。此参数设定是否使用推测执行 机制,如果设置为true则spark使用推测 执行 机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果。
      spark.speculation.interval    100 Spark多长时间进行检查task运行状态用以推测,以毫秒为单位。
      spark.speculation.quantile   0.75 推测启动前,Stage必须要完成总Task的百分比。
      spark.speculation.multiplier   1.5 比已完成Task的运行速度中位数慢多少倍才启用推测
      spark.locality.wait   3000 以下几个参数是关于Spark数据本地性的相关参数。 
    本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优 先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性。
      spark.locality.wait.process  spark.locality.wait  本地进程级别的本地等待时间
      spark.locality.wait.node  spark.locality.wait 本地节点级别的本地等待时间
      spark.locality.wait.rack  spark.locality.wait 本地机架级别的本地等待时间
     spark.scheduler.revive.interval    1000 复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算。
     
    I:安全
      属性名称   默认   含义
      spark.authenticate  false Spark是否启用内部身份验证。
     spark.authenticate.secret   无 设置Spark用于组件之间进行身份验证的密钥。如果不是YARN上运行并且spark.authenticate为true时,需要设置密钥。
      spark.core.connection. 
    auth.wait.timeout
      30 Spark用于组件时间进行身份认证的超时时间。
      spark.ui.filters   无 Spark web UI 要使用的 以逗号分隔的筛选器名称列表。 筛选器要符合javax servlet Filter标准, 每个筛选器的参数可以通过设置java系统属性来指定: 
    spark.<class name of filter>.params='param1=value1,param2=value2' 
    例如: 
    -Dspark.ui.filters=com.test.filter1 
    -Dspark.com.test.filter1.params='param1=foo,param2=testing'
      spark.ui.acls.enable  false Spark webUI存取权限是否启用。如果启用,在用户浏览web界面的时候会检查用户是否有访问权限。
      spark.ui.view.acls   空 以逗号分隔Spark webUI访问用户的列表。默认情况下只有启动Spark job的用户才有访问权限。
       

    J:Spark Streaming

      属性名称   默认   含义
     spark.streaming.blockInterval   200 在时间间隔内 (毫秒)Spark Streaming接收器将接收数据合并成数据块并存储在Spark。
      spark.streaming.unpersist  true 如果设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,同样的,SparkStreaming接收 的原始输入数据也会自动被清理;如果设置为false,则允许原始输入数据和持久化的RDD数据可被外部的Streaming应用程序访问,因为这些数据 不会自动清理。
       

    3:集群特有的属性

    A:Standalone特有属性

    Standalone还可以通过环境变量文件conf/spark-env.sh来设置属性,相关的配置项是:

    • SPARK_MASTER_OPTS  配置master使用的属性
    • SPARK_WORKER_OPTS  配置worker使用的属性
    • SPARK_DAEMON_JAVA_OPTS  配置master和work都使用的属性

    配置的时候,使用类似的语句:

    export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"

    # - 其中x代表属性,y代表属性值

    其中SPARK_MASTER_OPTS所支持的属性有:

      属性名称   默认   含义
     spark.deploy.spreadOut   true Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性,后者对于计算密集型工作负载更有效
     spark.deploy.defaultCores   无限 如果没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,如果不设置,应用程序获取所有的有效内核。注意在一个共享的集群中,设置一个低值防止攫取了所有的内核,影响他人的使用。
     spark.worker.timeout  60 master因为没有收到心跳信息而认为worker丢失的时间(秒)
       

    其中SPARK_WORKER_OPTS所支持的属性有:

      属性名称   默认   含义
     spark.worker.cleanup.enabled   false 是否定期清理worker的应用程序工作目录,只适用于Standalone模式,不适用于YARN模式。清理的时候将无视应用程序是否在运行。
     spark.worker.cleanup.interval  1800  清理worker本地过期的应用程序工作目录的时间间隔(秒)
     spark.worker.cleanup.appDataTtl   7*24*3600  worker保留应用程序工作目录的有效时间。该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定。
       

    其中SPARK_DAEMON_JAVA_OPTS所支持的属性有:

      属性名称   含义
     spark.deploy.recoveryMode 下面3个参数是用于配置zookeeper模式的master HA。 
    设置为ZOOKEEPER表示启用master备用恢复模式,默认为NONE。
     spark.deploy.zookeeper.url zookeeper集群URL
     spark.deploy.zookeeper.dir zooKeeper保存恢复状态的目录,缺省为/spark
     spark.deploy.recoveryMode 设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE
     spark.deploy.recoveryDirectory Spark保存恢复状态的目录
     

    B:YARN特有属性

    YARN特有属性的配置,应该是支持SparkConf方式和conf/spark-defaults.conf文件配置方式,  。

      属性名称   默认   含义
     spark.yarn.applicationMaster.waitTries  10 RM等待Spark AppMaster启动次数,也就是SparkContext初始化次数。超过这个数值,启动失败。
     spark.yarn.submit.file.replication  3  应用程序上载到HDFS的文件的复制因子
     spark.yarn.preserve.staging.files  false  设置为true,在job结束后,将stage相关的文件保留而不是删除。
     spark.yarn.scheduler.heartbeat.interval-ms  5000 Spark AppMaster发送心跳信息给YARN RM的时间间隔
     spark.yarn.max.executor.failures   2倍于executor数 导致应用程序宣告失败的最大executor失败数
     spark.yarn.historyServer.address  无  Spark history server的地址(要含有http://)。这个地址会在应用程序完成后提交给YARN RM,使得将信息从RM UI连接到history server UI上。
      
  • 相关阅读:
    磁盘缓存
    算法与追MM(转)
    人人都能上清华(转)
    软件加密技术和注册机制原理攻略(转)
    计算二重定积分
    C++运算符重载
    STL中list的用法
    累了??放松一下,看几张关于程序员的几张搞笑图片
    解决来QQ消息后歌曲音量降低问题
    搞ACM的你伤不起(转)
  • 原文地址:https://www.cnblogs.com/breg/p/4792432.html
Copyright © 2011-2022 走看看