1.Spark的集群模式
1.1 集群中的组件
1.1.1 driver
一个Spark应用本身在集群中是作为一个独立进程运行的.它在main程序中通过SparkContext来进行协调.这个独立进程就是driver端
1.1.2 executor
为了运行在集群上.driver端的SparkContext可以连接至几种集群调度器(Cluster Manager)(StandAlone,Mesos,YARN等).
这些集群调度器会分配资源.一旦分配成功,Spark就会获取集群中的用于执行的资源(Executor 进程).这些Executor进程可以运行计算和存储数据.
接下来,SparkContext会将应用程序代码(jar,或者Python文件)发送给Executor用以执行
1.2 一些Spark架构上的重要点
i).每个应用获取到的executor进程,会一直持续到整个应用的生命周期结束.并且在executor的内部,会以多线程的形式运行多个任务(Task)
优势是每个应用之间相互隔离,在每个应用的driver调度它自己的Task,而不同应用的task运行在不同的executor(也代表不同的JVM)上
劣势是除非将数据写在一个第三方中,否则不同的Spark应用是不能实现数据共享的
ii).Spark应用本身不知道其集群调度器是什么只要Spark能拿到executor并最终建立连接.它不关心这个executor是Mesos或者YARN调度的
iii).因为driver的中心地位,在Spark应用的整个生命周期中.driver必须保持对所有的executor的网络寻址.以监听和接收来自executor的连接请求
iv).因为driver负责调度集群上的任务(Task),更好的方式应该是在相同的局域网中靠近 worker 的节点上运行。
如果您不喜欢发送请求到远程的集群,倒不如打开一个 RPC 至 driver 并让它就近提交操作而不是从很远的 worker 节点上运行一个 driver
1.3 Spark集群的术语
Application | 用户构建在 Spark 上的程序。由集群上的一个 driver 程序和多个 executor 组成 |
Application jar | 一个包含用户 Spark 应用的 Jar.(用户的 Jar 应该没有包括 Hadoop 或者 Spark 库,因为它们将在运行时添加) |
Driver program | 该进程运行应用的 main() 方法并且创建了 SparkContext |
Cluster manager | 一个外部的用于获取集群上资源的服务。(例如,Standlone Manager,Mesos,YARN) |
Deploy mode | 根据 driver 程序运行的地方区别.Cluster模式:driver运行与集群之内. Client模式:driver由提交者在集群之外自行创建 |
Worker node | 任何在集群中可以运行应用代码的节点 |
Executor | 一个为了在 worker 节点上的应用而启动的进程,它运行 task 并且将数据保持在内存中或者硬盘存储。每个应用有它自己的 Executor |
Task | 任务.一个将要被发送到 Executor 中的工作单元. |
Job | 作业.一个由多个任务组成的并行计算,并且能从 Spark action 中获取响应(例如 save , collect ); 比如action,对用户来说可见的基本都是Job |
Stage | 每个 Job 被拆分成更小的被称作 stage(阶段) 的 task(任务) 组,stage 彼此之间是相互依赖的(与 MapReduce 中的 map 和 reduce stage 相似) |
2.Spark的集群调度类型
Spark目前支持以下四种集群调度类型:
Standalone
Apache Mesos
Hadoop YARN
Kubernetes
3.应用程序的提交
使用 spark-submit 脚本可以提交应用至任何类型的集群
4.监控
4.1 Web UI 监控
每个SparkContext都会启动一个Web UI,默认端口为4040,显示有关应用程序的有用信息。这包括:
i).调度器阶段和任务的列表
ii).RDD 大小和内存使用的概要信息
iii).环境信息
iv).正在运行的执行器的信息
可以通过在Web浏览器中打开 http://<driver-node>:4040
来访问此界面.(如果多个SparkContexts在同一主机上运行,则它们将绑定到连续的端口从4040(4041,4042等)开始)
注意:
默认情况下 Web UI 中信息仅适用于运行中的应用程序
要在事后还能通过Web UI查看,请在应用程序启动之前,将spark.eventLog.enabled
设置为true(详见后).
4.2 事件日志的持久化
使用file-system提供的程序类(spark.history.provider)可以对执行日志进行持久化.
当进行持久化时,基本日志记录目录必须在spark.history.fs.logDirectory
配置选项中提供,并且应包含每个代表应用程序的事件日志的子目录
Spark任务本身必须配置启用记录事件,并将其记录到相同共享的可写目录下.
例如,如果服务器配置了日志目录hdfs://namenode/shared/spark-logs,
那么客户端选项将是 spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://namenode/shared/spark-logs
4.3 history server
如果在一个应用结束之后还需要查看应用的信息.可以通过Spark的历史服务器构建应用程序的UI, 只要应用程序的事件日志存在,就可以继续查询
./sbin/start-history-server.sh
默认情况,会在
http://<server-url>:18080 创建一个WebUI
,显示未完成,完成以及其它尝试的任务信息
4.4 history server 的一些相关配置
4.4.1 环境相关
SPARK_DAEMON_MEMORY | history server 内存分配(默认值:1g) |
SPARK_DAEMON_JAVA_OPTS | history server JVM选项(默认值:无) |
SPARK_PUBLIC_DNS | history server 公共地址。如果没有设置,应用程序历史记录的链接可能会使用服务器的内部地址,导致链接断开(默认值:无) |
SPARK_HISTORY_OPTS | spark.history.* history server 配置选项(默认值:无) |
4.4.2 Spark配置
spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider | 执行应用程序历史后端的类的名称。 目前只有一个实现,由Spark提供,它查找存储在文件系统中的应用程序日志 |
spark.history.fs.logDirectory | file:/tmp/spark-events | 要加载的应用程序事件日志的目录.可以是本地文件系统或任何Hadoop支持的分布式文件系统 |
spark.history.fs.update.interval | 10s | 在日志目录中检查新的或更新的日志间隔区间. |
spark.history.retainedApplications | 50 | 在缓存中保留UI数据的应用程序数量.如果超出此上限,则最早的应用程序将从缓存中删除(删除后从磁盘读取) |
spark.history.ui.maxApplications | Int.MaxValue | 在历史记录摘要页面上显示的应用程序数量 |
spark.history.ui.port | 18080 | history server 的Web界面绑定的端口 |
spark.history.kerberos.enabled | false | 表明 history server 是否应该使用kerberos进行登录.如果 history server 正在访问安全的Hadoop集群上的HDFS文件 |
spark.history.kerberos.principal | (none) | 如果使用使用kerberos进行登录,history server 的Kerberos主要名称 |
spark.history.kerberos.keytab | (none) | 如果使用使用kerberos进行登录,history server 的kerberos keytab文件的位置 |
spark.history.ui.acls.enable | false | 指定是否应检查acls授权查看应用程序的用户 |
spark.history.ui.admin.acls | empty | 通过逗号来分隔具有对history server中所有Spark应用程序的查看访问权限的用户/管理员列表 |
spark.history.ui.admin.acls.groups | empty | 通过逗号来分隔具有对history server中所有Spark应用程序的查看访问权限的组的列表 |
spark.history.fs.cleaner.enabled | false | 指定 History Server是否应该定期从存储中清除事件日志 |
spark.history.fs.cleaner.interval | 1d | job history清洁程序检查要删除的文件的时间间隔 |
spark.history.fs.cleaner.maxAge | 7d | history保留时间,超出这个时间江北清洁程序删除 |
spark.history.fs.numReplayThreads | 25% of available cores | history server 用于处理事件日志的线程数 |
注意:
i).history server 显示完成的和未完成的Spark作业。 如果应用程序在失败后进行多次尝试,将显示失败的尝试,以及任何持续未完成的尝试或最终成功的尝试
ii).未完成的程序只会间歇性地更新。 更新的时间间隔由更改文件的检查间隔 (spark.history.fs.update.interval
) 定义。 在较大的集群上,更新间隔可能设置为较大的值。
查看正在运行的应用程序的方式实际上是查看自己的Web UI
iii).没有注册完成就退出的应用程序将被列出为未完成的,即使它们不再运行。如果应用程序崩溃,可能会发生这种情况
iv).一个用于表示完成Spark作业的一种方法是明确地停止Spark Context (sc.stop()
),或者在Python中使用 with SparkContext() as sc
4.5 REST API
Spark也提供Rest方式.这为开发人员提供了一种简单的方法来为Spark创建新的可视化和监控工具
此处略,详见官网
4.6 Metrics 与 Ganglia
略,详见官网