zoukankan      html  css  js  c++  java
  • spark Standalone

    Spark Standalone

            Spark Standalone模式中,资源调度是Spark框架自己实现的,其节点类型分为Master节点和Worker节点,其中Driver运行在Master节点中,并且有常驻内存的Master进程守护,Worker节点上常驻Worker守护进程,负责与Master通信,通过ExecutorRunner来控制运行在当前节点上的CoarseGrainedExecutorBackend。每个Worker 上存在一个或者多个 CoarseGrainedExecutorBackend进程。每个进程包含一个Executor对象,该对象持有一个线程池,每个线程可以执行一个 Task。

            SparkStandalone是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群类似,都存在着Master单点故障的问题。目前Spark提供了两种方式来解决此问题,一种是基于文件系统的故障恢复模式,这种方法适合当Master进程挂掉之后,直接重启即可;另一种方式是基于Zookeeper的HA方式,类似于HDFS的NameNode的HA方案,ActiveMaster挂掉之后,Standby Master会立即切换过去继续对外提供服务,同时这种基于Zookeeper的HA方案也被很多分布式框架采用,例如另外一种流式计算框架Storm以及HBase等。

    集群启动脚本

    在spark目录下执行,这些脚本只能在集群的master上执行


    sbin/start-master.sh  创建一个master实例

    sbin/start-slaves.sh   创建一个slave实例(conf/slaves中指定的每台机器上)

    sbin/start-all.sh      同时创建一个master和多个slave实例

    sbin/stop-all.sh      停止master(由bin/start-master.sh创建)

    sbin/stop-slaves.sh   停止所有slave实例(位于conf/slaves上的机器)

    sbin/stop-all.sh      同时停止master和slaves

    Application的集群挂载

            配置好集群后可以再集群上挂载一个应用(application),只需要把master的URL传递给SparkContext或SparkConf接口的构造方法即可。

    例如:

    val conf = newSparkConf.setMaster(“spark://master:7077”)

    val sc = newSparkContext(conf)

    如果要把Spark-shell中运行集群上只需要输入:

    Spark-shell--master spark://master:7077

    当然Spark-shell中还有很多参数,例如--core <numCores>来控制spark-shell在集群上所使用的CPU核数。

    启动编译的程序(Application)

            脚本spark-submit提供了standalone集群提交编译的spark-application的最直接的方式。对于standalone集群,spark目前仅支持客户端作业进程内的驱动程序(client deploy mode)。

            如果spark-application通过脚本spark-submit启动,那么application包就会自动分配到所有的worker节点上,如果application还同时依赖其他包文件,就应该用—jars 的标志来指定它们,不同包文件之间用逗号分隔(例如--jars jar1,jar2 ),更多的spark-submit参数,参考以前写过的 spark任务提交。

    资源调度

            Standalone的集群模式目前只支持简单的FIFO作业调度方式。但是,你可以通过控制Spark作业能够获得的最大资源数来实现大量的并发作业。默认的情况下,一个作业会占用集群中的全部资源,但是这在只有一个作业进程的情况下才有意义。你可以在SparkConf中设置spark.cores.max来覆盖系统设置的CPU可用核数的属性,例如:将该作业的核数设为10

            val conf = newSparkConf().set(“spark.cores.max”,”10”)

            另外,可以在集群的master进程中配置脚本文件spark.deploy.defaultCores来改变作业运行所需要最大内核数的默认值属性,但这需要将下面的配置选项添加到conf/spark-env.sh中:

            exportSPARK_MASTER_OPTS=”-Dspark.deploy.defaultCores=<value>”

            这在共享集群中非常有用,因为用户不可能单独配置自己作业的最大可用核数属性。

    监控和日志

            Spark通过Web UI来监控集群,master和每个worker都有自己的webUI用于显示集群和作业的统计资料。默认情形下,用户可以通过访问master的主端口master:8080进入到master的Web UI,端口值可以通过配置文件或命令行选项来改变。

            此外,每个作业输出的详细日志可以写入到它的工作目录中(默认为每个Worker节点上的SPARK_HOME/work)。在工作目录中,每个作业对应两个文件stdout和stderr,这两个文件包含了该作业所有控制台输出的信息。

            注:还会有master发送到该worker上的程序jar包。

    高可用性(HA)

            通常情况下,单机模式的集群调度具有一定的容错能力(目前Spark通过移除失败作业到其他的worker上来实现自身的抗故障特性)。然后,用master做调度决策会产生单点故障的问题,如果master崩溃,就无法创建新的作业了。Spark提出了两种解决方案。

    1. ZooKeeper的StandbyMasters

    1.1 描述

            将Standalone集群连接到同一个Zookeeper实例并启动多个Master,利用ZooKeeper提供的选举和状态保存功能,选举一个Master,其他的Master处于Standby状态。如果当前的Master发生故障,另一个Master同样通过选举产生,并恢复到发生故障的Master的状态,恢复调度。整个过程可能需要1-2分钟。该延时只会影响新的Application的调度,对于故障切换过程中正在运行的Application没有任何影响。

    1.2 配置

    在spark-env里对SPARK_DAEMON_JAVA_OPTS设置以启用该恢复模式:

    系统属性

    描述

    spark.deploy.recoveryMode

    设置ZOOKEEPER启用standby Master恢复模式(缺省值:NONE

    spark.deploy.zookeeper.url

    Zooker集群的URL (e.g., 192.168.1.100:2181,192.168.1.101:2181).

    spark.deploy.zookeeper.dir

    设置Zookeeper集群中用于存储恢复状态的文件目录 (缺省值: /spark).


    1.3 补充

            完成ZooKeeper集群的设置后,启用其高可用性是非常简单的。首先启动这个ZooKeeper集群,然后在不同节点上启动多个Masters,注意这些节点需要具有相同的ZooKeeper配置(ZooKeeper URL 和目录),而且Masters 可以随时被添加和移除。

            当调用新的Applications或添加新的Workers时,它们需要知道当前Master的IP地址,这种HA方案处理这种情况很简单,只需要使SparkContext接口指向一个Master列表就可以了,如spark://host1:port1,host2:port2。SparkContext就会尝试着依次对Master列表进行注册并轮询,如果host1出现故障,配置信息对于寻找host2仍然会有效。

            注册接入Master和正常运行没有什么重要不同,当集群启动时, application或worker需要能够找到并注册接入到当前的lead Master, 一旦注册成功,它们就会被并入系统(例如储存在ZooKeeper中)。 发生故障切换时,新被选中的Master就会联系所有先前已经注册成功的applications和Workers,并告知它们领导关系的变化。因此从某种意义上讲,这些applications和workers并不需要事先知道新Master的存在性。

            正是由于这一属性,新的Masters可以在任何时候被创建。唯一需要注意的是,在这个新的Masters成为leader的情形下,新的applications和workers 能够找到并注册接入到这个Master。 一旦注册成功,这个application或worker就会被启用。

    2. 基于文件系统的单点恢复

    2.1 描述

    尽管Zookeeper是用于生产模式高可用性的最佳途径,但如果你仅仅想故障后重启你的Master,文件系统(FILESYSTEM)模式就可以做到。Spark提供了目录来保存Application和worker的注册状态信息,一但Master发生故障,就可以通过重新启动Master进程的方式来恢复Application和worker的旧有状态。

    2.2 配置

     在spark-env里对SPARK_DAEMON_JAVA_OPTS设置:

    系统属性

    含义

    spark.deploy.recoveryMode

    设置文件系统启用单点恢复模式(缺省值:NONE

    spark.deploy.recoveryDirectory

    Spark-Master可进入的文件目录,用于存储作业的恢复状态


    集群环境变量配置

    1. 环境变量

    配置在conf/spark-env.sh(通过conf/spark-env.sh.template模板创建的)中,然后拷贝到所有的worker节点上生效:

    环境变量

    含义

    SPARK_MASTER_IP

    Master节点的IP

    SPARK_MASTER_PORT

    Master的端口号 (缺省值: 7077).

    SPARK_MASTER_WEBUI_PORT

    master web UI端口号 (缺省值: 8080).

    SPARK_MASTER_OPTS

    设置只适用于Master的配置属性,形式为”-Dx=y”,配置选项参见下面列表

    SPARK_LOCAL_DIRS

    设置Spark缓存空间目录,包括存储在磁盘上shufflemap阶段的输出文件和RDD,该文件夹应该设置在你系统中一个高速的本地磁盘上(提高运行效率),也可以是用逗号分割的位于不同磁盘的多个目录

    SPARK_WORKER_CORES

    设置Worker节点Application可用的CPU总核数(缺省值:所有)

    SPARK_WORKER_MEMORY

    设置Worker节点Application可用的内存总量,eg.1000M,2G

    (缺省值:内存总量-1G) 

    注:在单个Application的内存配置使用spark.executor.memory属性,

    SparkConfSparkContext中设置

    SPARK_WORKER_PORT

    Worker节点端口号 (缺省值: 随机).

    SPARK_WORKER_WEBUI_PORT

    Worker节点 web UI端口号 (缺省值: 8081).

    SPARK_WORKER_INSTANCES

    每个Worker节点启动的worker进程数量(缺省值:1)。

    如果使worker的实例数大于1。要使用SPARK_WORKER_CORES

    来限制每个worker进程所允许使用的内核数,

    否则每个worker会尝试使用所有的处理器内核。

    SPARK_WORKER_DIR

    Worker节点的工作路径,包括日志和暂存空间等。

    (缺省值:Woker节点上的SPARK_HOME/work

    SPARK_WORKER_OPTS

    设置只适用于Woker节点的配置属性,形式为”-Dx=y”,配置选项参见下面列表

    SPARK_DAEMON_MEMORY

    Spark masterworker守护进程分配内存 (缺省值: 512m).

    SPARK_DAEMON_JAVA_OPTS

    配置Master节点和Worker节点都使用的属性(缺省值: 无)

    SPARK_PUBLIC_DNS

    设置Spark masterworkers的公共DNS名称 (缺省值: ).



    2. Master参数

    SPARK_MASTER_OPTS支持的参数如下:

    属性名称

    缺省值

    含义

    spark.deploy.retainedApplications

    200

    显示已完成的Application的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。

    spark.deploy.retainedDrivers

    200

    显示已完成的driver的最大数目,当超过这个数目的时候,将从UI中删除之前的Application以维持这个数目。

    spark.deploy.spreadOut

    true

    Standalone集群管理器是否自由选择节点(true)还是固定到尽可能少的节点(false),前者会有更好的数据本地性,后者对于密集型工作负载更有效

    spark.deploy.defaultCores

    (infinite)

    Standalone集群模式中,在没有设置spark.cores.max的情况下,系统默认分给applicationCPU核数。该情况下Application获得全部CPU核数,在共享集群上(例如mapReduce),应该手动设置spark.cores.max的值,以防止默认获得整个集群的资源。

    spark.worker.timeout

    60

    设置时限(秒),master超过这个时间收不到worker的心跳,既认定worker已经丢失(故障)


    3. Worker的参数

    SPARK_MASTER_OPTS支持的参数如下:

    属性名称

    缺省值

    含义

    spark.worker.cleanup.enabled

    false

    是否定期清理Worker节点的应用程序工作目录,清理时不管Application是否运行,

    该参数只对Standalone生效

    spark.worker.cleanup.interval

    1800 (30 minutes)

    清理Worker节点本地过期的应用程序工作目录的时间间隔

    spark.worker.cleanup.appDataTtl

    7 * 24 * 3600 (7 days)

    Worker保留应用程序工作目录的有效时间。该时间由磁盘空间,应用程序日志,应用程序的jar包以及应用程序的提交频率来设定,默认7天。

    4. 一些配置的样例

    在conf/spark-env.sh中配置:

    //指定Standalone集群IP

    export SPARK_MASTER_IP=master  

    //设定每个Worker节点可用内存

    export SPARK_WORKER_MEMORY=1g

    //设置Application能使用的CPU核数为4,默认为使用所有集群中的COU核数。

    export SPARK_JAVA_OPTS=”-Dspark.cores.max=4” 

    //指定Master的HA,依赖于zookeeper集群,url参数和dir参数在上方的HA有说明。

    export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER

    -Dspark.deploy.zookeeper.url=host1:port,host2:port

    -Dspark.deploy.zookeeper.dir=/spark”

    注:在设置Worker进程的CPU个数和内存大小,要注意机器的实际硬件条件,如果配置的超过当前Worker节点的硬件条件,Worker进程会启动失败。

  • 相关阅读:
    hdu 5696 区间的价值 单调栈+rmq
    bzoj 3039: 玉蟾宫 单调栈或者悬线法求最大子矩阵和
    bzoj 2435: [Noi2011]道路修建 dfs
    Codeforces gym 100971 D. Laying Cables 单调栈
    codeforces GYM 100971F 公式题或者三分
    UVA 10539
    BZOJ 1079: [SCOI2008]着色方案 DP
    UVA 11426
    UVA 11728
    UVA 10090
  • 原文地址:https://www.cnblogs.com/zhangyunlin/p/6168202.html
Copyright © 2011-2022 走看看