Components
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
Spark应用程序作为一系列独立的进程运行在集群上,被在main程序中的SparkContext对象(驱动程序)协调。
Specifically, to run on a cluster,
1/ the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications.
SparkContext连上一个集群管理器,跨应用程序分配资源。
2/ Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
一旦连接上,Spark从集群上的节点获取执行器,执行器是为应用程序计算和存储数据的进程。
3/ Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.
发送应用程序代码到执行器上。
4/ Finally, SparkContext sends tasks to the executors to run.
SparkContext发送任务到执行器上执行。
Spark Standalone Mode
1/ Starting a Cluster Manually
You can start a standalone master server by executing:
./sbin/start-master.sh
Similarly, you can start one or more workers and connect them to the master via:
./sbin/start-slave.sh <master-spark-URL>
2/ Cluster Launch Scripts
To launch a Spark standalone cluster with the launch scripts, you should create a file called conf/slaves in your Spark directory, which must contain the hostnames of all the machines where you intend to start Spark workers, one per line. If conf/slaves does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing. Note, the master machine accesses each of the worker machines via ssh. By default, ssh is run in parallel and requires password-less (using a private key) access to be setup. If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.
Once you’ve set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop’s deploy scripts, and available in SPARK_HOME/sbin:
sbin/start-master.sh - Starts a master instance on the machine the script is executed on.
sbin/start-slaves.sh - Starts a slave instance on each machine specified in the conf/slaves file.
sbin/start-slave.sh - Starts a slave instance on the machine the script is executed on.
sbin/start-all.sh - Starts both a master and a number of slaves as described above.
sbin/stop-master.sh - Stops the master that was started via the bin/start-master.sh script.
sbin/stop-slaves.sh - Stops all slave instances on the machines specified in the conf/slaves file.
sbin/stop-all.sh - Stops both the master and the slaves as described above.
Note that these scripts must be executed on the machine you want to run the Spark master on, not your local machine.
You can optionally configure the cluster further by setting environment variables in conf/spark-env.sh. Create this file by starting with the conf/spark-env.sh.template, and copy it to all your worker machines for the settings to take effect. The following settings are available:
3/ Connecting an Application to the Cluster
To run an application on the Spark cluster, simply pass the spark://IP:PORT URL of the master as to the SparkContext constructor.
To run an interactive Spark shell against the cluster, run the following command:
./bin/spark-shell --master spark://IP:PORT
You can also pass an option --total-executor-cores <numCores> to control the number of cores that spark-shell uses on the cluster.
4/ Launching Spark Applications
启动Spark应用程序
The spark-submit script provides the most straightforward way to submit a compiled Spark application to the cluster. For standalone clusters, Spark currently supports two deploy modes. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker processes inside the cluster, and the client process exits as soon as it fulfills its responsibility of submitting the application without waiting for the application to finish.
spark-submit脚本提供了最直接的方式提交一个编译的Spark应用程序到集群。对standalone集群,Spark现在支持两种部署模式。
在client mode,driver得启动和提交应用程序的客户端在同一个进程。但是,在cluster mode,driver得启动是在集群中的一个Wrok进程,并且客户端进程在履行完应用程序的提交责任后立即退出,而不是等应用程序执行完。
If your application is launched through Spark submit, then the application jar is automatically distributed to all worker nodes. For any additional jars that your application depends on, you should specify them through the --jars flag using comma as a delimiter (e.g. --jars jar1,jar2). To control the application’s configuration or execution environment, see Spark Configuration.
如果application通过Spark submit启动,application jar自动分发到all worker nodes。对application依赖的其他任何jar,您应该通过--jars标记使用逗号作为分隔符(例如--jars jar1,jar2)来指定它们。
Additionally, standalone cluster mode supports restarting your application automatically if it exited with non-zero exit code. To use this feature, you may pass in the --supervise flag to spark-submit when launching your application. Then, if you wish to kill an application that is failing repeatedly, you may do so through:
另外,standalone集群模式支持自动重启application,如果非零退出。要使用此功能,可以在启动application时传递 --supervise 给spark-submit。如果想要kill 一个重复失败的 application,你可以这样做:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
You can find the driver ID through the standalone Master web UI at http://<master url>:8080.
5/Resource Scheduling
The standalone cluster mode currently only supports a simple FIFO scheduler across applications. However, to allow multiple concurrent users, you can control the maximum number of resources each application will use. By default, it will acquire all cores in the cluster, which only makes sense if you just run one application at a time. You can cap the number of cores by setting spark.cores.max in your SparkConf. For example:
standalone cluster mode目前只支持跨applications的一个简单FIFO scheduler。但是,为了允许多个并发用户,您可以控制每个应用程序将使用的最大资源数量。默认情况下,它将获取集群中的所有内核,只有您一次运行一个应用程序才有意义。 您可以通过在SparkConf中设置spark.cores.max来封顶核心数量。 例如:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
In addition, you can configure spark.deploy.defaultCores on the cluster master process to change the default for applications that don’t set spark.cores.max to something less than infinite. Do this by adding the following to conf/spark-env.sh:
此外,你可以在集群上的master进程上配置spark.deploy.defaultCores,以将未设置spark.cores.max的应用程序的默认值更改为小于无限大的值。 通过将以下内容添加到conf / spark-env.sh来执行此操作:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
This is useful on shared clusters where users might not have configured a maximum number of cores individually.
6/ Monitoring and Logging
Spark’s standalone mode offers a web-based user interface to monitor the cluster. The master and each worker has its own web UI that shows cluster and job statistics. By default you can access the web UI for the master at port 8080. The port can be changed either in the configuration file or via command-line options.
每个slave node的输出日志目录SPARK_HOME/work。
In addition, detailed log output for each job is also written to the work directory of each slave node (SPARK_HOME/work by default). You will see two files for each job, stdout and stderr, with all output it wrote to its console.
7/ Running Alongside Hadoop(与Hadoop一起运行)
You can run Spark alongside your existing Hadoop cluster by just launching it as a separate service on the same machines. To access Hadoop data from Spark, just use a hdfs:// URL (typically hdfs://<namenode>:9000/path, but you can find the right URL on your Hadoop Namenode’s web UI). Alternatively, you can set up a separate cluster for Spark, and still have it access HDFS over the network; this will be slower than disk-local access, but may not be a concern if you are still running in the same local area network (e.g. you place a few Spark machines on each rack that you have Hadoop on).
你可以和现有的Hadoop集群一起运行Spark,只需要在同一台机器上作为单独的服务启动。在Spark上访问Hadoop数据,只需要使用hdfs:// URL (typically hdfs://<namenode>:9000/path,但你可以在Hadoop Namenode’s web UI上找到正确的URL)。或者,你可以为Spark设置单独的集群,仍然可以通过网络访问HDFS;这将比磁盘本地化的速度慢,但是如果在同一个局域网中差别不大。
8/ Configuring Ports for Network Security
Spark makes heavy use of the network, and some environments have strict requirements for using tight firewall settings. For a complete list of ports to configure, see the security page.
Spark大量使用网络,某些环境对使用严格的防火墙设置有严格的要求。 有关要配置的端口的完整列表,请参阅security page。
9/ High Availability
Running Spark on YARN
1/ Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration. If the configuration references Java system properties or environment variables not managed by YARN, they should also be set in the Spark application’s configuration (driver, executors, and the AM when running in client mode).
确保HADOOP_CONF_DIR or YARN_CONF_DIR指向包含Hadoop cluster配置文件目录(客户端)。这些配置文件用于写入HDFS并连接到YARN ResourceManager。此目录中包含的配置将分发到YARN集群,以便应用程序使用的所有容器都使用相同的配置。 如果配置引用了不受YARN管理的Java系统属性或环境变量,那么也应该在Spark application’s configuration (driver, executors, and the AM when running in client mode)中进行设置。
There are two deploy modes that can be used to launch Spark applications on YARN. In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
有两种部署模式能够被用来启动Spark applications on YARN。In cluster mode, Spark driver运行在由集群上的YARN管理的application master process中,client可以在初始化application后离开。In client mode,driver运行在client process,application master仅用于从YARN请求资源。
Unlike Spark standalone and Mesos modes, in which the master’s address is specified in the --master parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the --master parameter is yarn.
To launch a Spark application in cluster mode:
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
For example:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode cluster
--driver-memory 4g
--executor-memory 2g
--executor-cores 1
--queue thequeue
lib/spark-examples*.jar
10
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the “Debugging your Application” section below for how to see driver and executor logs.
以上启动了启动一个default Application Master的YARN client program。SparkPi将作为Application Master的子进程运行。client将定期轮询Application Master的状态更新和将其显示在控制台。一旦your application执行完毕,client将退出。
To launch a Spark application in client mode, do the same, but replace cluster with client. The following shows how you can run spark-shell in client mode:
$ ./bin/spark-shell --master yarn --deploy-mode client
2/ Adding Other JARs
In cluster mode, the driver runs on a different machine than the client, so SparkContext.addJar won’t work out of the box with files that are local to the client. To make files on the client available to SparkContext.addJar, include them with the --jars option in the launch command.
$ ./bin/spark-submit --class my.main.Class
--master yarn
--deploy-mode cluster
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
3/ Preparations
Running Spark on YARN requires a binary distribution of Spark which is built with YARN support. Binary distributions can be downloaded from the downloads page of the project website. To build Spark yourself, refer to Building Spark.
To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars and upload it to the distributed cache.
4/ Configuration
Most of the configs are the same for Spark on YARN as for other deployment modes. See the configuration page for more information on those. These are configs that are specific to Spark on YARN.
5/ Debugging your Application
In YARN terminology, executors and application masters run inside “containers”. YARN has two modes for handling container logs after an application has completed. If log aggregation is turned on (with the yarn.log-aggregation-enable config), container logs are copied to HDFS and deleted on the local machine. These logs can be viewed from anywhere on the cluster with the yarn logs command.
在YARN术语中,执行者和应用程序主人在“容器”内部运行。 应用程序完成后,YARN有两种处理容器日志的方式。 如果日志聚合已打开(使用yarn.log-aggregation-enable config),容器日志将复制到HDFS并在本地计算机上删除。 可以使用纱线日志命令从群集上的任何位置查看这些日志。
yarn logs -applicationId <app ID>
will print out the contents of all log files from all containers from the given application. You can also view the container log files directly in HDFS using the HDFS shell or API. The directory where they are located can be found by looking at your YARN configs (yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix). The logs are also available on the Spark Web UI under the Executors Tab. You need to have both the Spark history server and the MapReduce history server running and configure yarn.log.server.url in yarn-site.xml properly. The log URL on the Spark history server UI will redirect you to the MapReduce history server to show the aggregated logs.
将从给定的应用程序中打印所有日志文件的内容。 您还可以使用HDFS shell或API直接在HDFS中查看容器日志文件。 可以通过查看YARN配置(yarn.nodemanager.remote-app-log-dir和yarn.nodemanager.remote-app-log-dir-suffix)找到它们所在的目录。 日志也可以在Spark Web UI的“执行程序”选项卡下使用。 您需要同时运行Spark历史记录服务器和MapReduce历史记录服务器,并正确地在yarn-site.xml中配置yarn.log.server.url。 Spark历史记录服务器UI上的日志URL将重定向到MapReduce历史记录服务器以显示聚合日志。
When log aggregation isn’t turned on, logs are retained locally on each machine under YARN_APP_LOGS_DIR, which is usually configured to /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID. The logs are also available on the Spark Web UI under the Executors Tab and doesn’t require running the MapReduce history server.
当日志聚合未打开时,日志将保留在YARN_APP_LOGS_DIR下的每台计算机上,通常根据Hadoop版本和安装配置为/ tmp / logs或$ HADOOP_HOME / logs / userlog。查看容器的日志需要转到包含它们的主机并查看此目录。子目录根据应用程序ID和容器ID组织日志文件。日志也可在Spark Web UI的Executors选项卡下使用,不需要运行MapReduce历史记录服务器。
To review per-container launch environment, increase yarn.nodemanager.delete.debug-delay-sec to a large value (e.g. 36000), and then access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, JARs, and all environment variables used for launching each container. This process is useful for debugging classpath problems in particular. (Note that enabling this requires admin privileges on cluster settings and a restart of all node managers. Thus, this is not applicable to hosted clusters).
要查看每个容器启动环境,请将yarn.nodemanager.delete.debug-delay-sec增加到一个较大的值(例如36000),然后通过在容器的节点上的纱线.nodemanager.local-dirs访问应用程序缓存。推出。此目录包含启动脚本,JAR和用于启动每个容器的所有环境变量。此过程特别适用于调试类路径问题。 (请注意,启用此功能需要管理员对群集设置的权限和所有节点管理器的重新启动,因此这不适用于宿主群集)。
To use a custom log4j configuration for the application master or executors, here are the options:
upload a custom log4j.properties using spark-submit, by adding it to the --files list of files to be uploaded with the application.
add -Dlog4j.configuration=<location of configuration file> to spark.driver.extraJavaOptions (for the driver) or spark.executor.extraJavaOptions (for executors). Note that if using a file, the file: protocol should be explicitly provided, and the file needs to exist locally on all the nodes.
update the $SPARK_CONF_DIR/log4j.properties file and it will be automatically uploaded along with the other configurations. Note that other 2 options has higher priority than this option if multiple options are specified.
Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).
If you need a reference to the proper location to put log files in the YARN so that YARN can properly display and aggregate them, use spark.yarn.app.container.log.dir in your log4j.properties. For example, log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log. For streaming applications, configuring RollingFileAppender and setting file location to YARN’s log directory will avoid disk overflow caused by large log files, and logs can be accessed using YARN’s log utility.
如果您需要引用正确的位置将日志文件放在YARN中,以便YARN可以正确显示和聚合它们,请在log4j.properties中使用spark.yarn.app.container.log.dir。 例如,log4j.appender.file_appender.File = $ {spark.yarn.app.container.log.dir} /spark.log。 对于流式应用程序,配置RollingFileAppender并将文件位置设置为YARN的日志目录将避免大型日志文件引起的磁盘溢出,并且可以使用YARN的日志实用程序访问日志。
To use a custom metrics.properties for the application master and executors, update the $SPARK_CONF_DIR/metrics.properties file. It will automatically be uploaded with other configurations, so you don’t need to specify it manually with --files.
6/Important notes
Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
核心请求是否符合调度决策取决于哪个调度程序正在使用以及如何配置。
In cluster mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. In client mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in spark.local.dir. This is because the Spark driver does not run on the YARN cluster in client mode, only the Spark executors do.
在群集模式下,Spark执行程序和Spark驱动程序使用的本地目录将是为YARN(Hadoop YARN config yarn.nodemanager.local-dirs)配置的本地目录。如果用户指定了spark.local.dir,它将被忽略。在客户端模式下,Spark执行程序将使用为YARN配置的本地目录,而Spark驱动程序将使用在spark.local.dir中定义的目录。这是因为Spark驱动程序不是在客户端模式下运行在YARN集群上,而只是Spark执行器。
The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt, and your application should use the name as appSees.txt to reference it when running on YARN.
--files和--archives选项支持使用类似于Hadoop的#指定文件名。例如,您可以指定:--files localtest.txt#appSees.txt,这将把您本地名为localtest.txt的文件上传到HDFS,但这将通过名称appSees.txt链接,您的应用程序应使用将其命名为appSees.txt,以便在YARN上运行时引用它。
The --jars option allows the SparkContext.addJar function to work if you are using it with local files and running in cluster mode. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
如果您使用本地文件并以群集模式运行,则--jars选项将允许SparkContext.addJar函数起作用。如果您使用HDFS,HTTP,HTTPS或FTP文件,则不需要使用它。