zoukankan      html  css  js  c++  java
  • Spark(十二) -- Spark On Yarn & Spark as a Service & Spark On Tachyon

    Spark On Yarn:

    从0.6.0版本其,就可以在在Yarn上运行Spark
    通过Yarn进行统一的资源管理和调度
    进而可以实现不止Spark,多种处理框架并存工作的场景

    部署Spark On Yarn的方式其实和Standalone是差不多的,区别就是需要在spark-env.sh中添加一些yarn的环境配置,在提交作业的时候会根据这些配置加载yarn的信息,然后将作业提交到yarn上进行管理

    首先请确保已经部署了Yarn,相关操作请参考:

    hadoop2.2.0集群安装和配置

    部署完成之后可以通过
    yarn-master:8088
    查看yarn的web管理界面
    yarn-master为配置的yarn主机名或ip地址

    Spark的一些配置如下:
    修改spark-env.sh文件
    必须添加的是
    HADOOP_CONF_DIR 或者 YARN_CONF_DIR指向hadoop的conf配置文件目录

    其余的和Spark Standalone部署是一样的,具体请参考:

    Spark(一)– Standalone HA的部署

    另外,可以通过
    SPARK_YARN_USER_ENV
    来配置要传给Spark进程的环境变量,如JAVA_HOME等

    通过export SPARK_JAR=hdfs://some/path
    来将jar文件放在全局可读的HDFS上,缓存在各个节点中,这样一来,运行应用时就无需每次都分发jar文件到各个节点上

    两种方式作业提交方式:
    1.yarn-cluster
    在spark目录下执行:

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples*.jar 10

    来运行SparkPi这个example

    2.yarn-client

    和之前的方式一模一样,只是将yarn-cluster换成yarn-client,如下:

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples*.jar 10

    两种方式的区别:
    client方式下,Spark的Driver会在客户端进程中,Application Master仅仅是向Yarn申请资源,同时会在客户端(终端)上打印出具体的执行log

    cluster方式下,Driver会在Application Master进程中运行,受到Yarn的管理。客户端在应用初始化之后就可以脱离,这时候在客户端不能收到执行的log信息,但是可以通过Yarn的WebUI来查看作业的运行情况

    Spark On Yarn作业的提交方式和Standalone相比仅仅是将–master这个参数由具体的spark主节点,换成了yarn-cluster/client

    Spark as a Service:

    将部署好的Spark集群作为一种服务通过REST接口向外提供
    这就很像云计算模型

    我们将Spark集群部署好,将适用于各种场景作业的jar包分配上去,而外面的人通过REST接口来调用我们提供的各种服务,这就是Spark as a Service

    其中典型的实现是JobServer

    JobServer其实就是一套软件,将其下载下来之后部署在Spark集群上

    它会想外界提供REST接口,Spark上的各个资源都可以通过一个唯一的URL来访问

    构架图如下:

    这里写图片描述

    特性
    “Spark as a Service”: 简单的面向job和context管理的REST接口
    通过长期运行的job context支持亚秒级低延时作业(job)
    可以通过结束context来停止运行的作业(job)
    分割jar上传步骤以提高job的启动
    异步和同步的job API,其中同步API对低延时作业非常有效
    支持Standalone Spark和Mesos
    Job和jar信息通过一个可插拔的DAO接口来持久化
    命名RDD以缓存,并可以通过该名称获取RDD。这样可以提高作业间RDD的共享和重用

    部署JobServer需要sbt

    JobServer下载地址

    装好sbt之后,将JobServer解压,进入其根目录
    敲sbt
    进入sbt命令之后(第一次启动要下载很多jar包,可能会因为网络的问题卡很久。。)
    执行

    re-start --- -Xmx4g

    此时会下载spark-core,jetty和liftweb等相关模块
    完成之后可以通过访问http://localhost:8090 可以看到Web UI

    这里写图片描述

    相关的API如下:

    curl --data-binary @job-server-tests/target/job-
    server-tests-0.3.1.jar localhost:8090/jars/test
    //运行指定的jar包
    
    curl localhost:8090/jars/
    //查看提交的jar
    
    curl -d "input.string = hello job server" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
    //提交的appName为test,class为spark.jobserver.WordCountExample
    
    curl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2
    curl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2/config
    //通过job-id查看结果和配置信息
    
    curl -d "input.string = hello job server" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true'
    //sync=true会直接将执行接口返回,如果没有设置,那么将会分配一个jobId,等作业完成后可以通过jobId在查看信息
    
    
    curl -d "" ‘localhost:8090/contexts/test-context?
    num-cpu-cores=4&mem-per-node=512m'
    //启动一个context
    
    curl localhost:8090/contexts
    //查询所有的context
    
    curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true'
    //在某个指定的context上执行作业

    配置文件:

    vim spark-jobserver/config/local.conf.template
    master = "local[4]"//将这里改为集群的地址
    
    jobdao = spark.jobserver.io.JobFileDAO
        filedao {
          rootdir = /tmp/spark-job-server/filedao/data
        }
    //数据对象的存储方法和存储路径
    
    context-settings {
        num-cpu-cores = 2  
        memory-per-node = 512m
        }
    //context的默认设置,如果在REST接口中显示的指明了context的配置,那么这里将会被覆盖
    
    POST /contexts/my-new-context?num-cpu-cores=10
    //在REST中设置一个新的context,以参数的形式放在url中

    JobServer部署:

    复制config/local.sh.template到env.sh ,并且设置相关参数如:指定安装路径,Spark Home, Spark Conf等。

    DEPLOY_HOSTS="spark1
                  spark2
                  spark3"
    APP_USER=spark
    APP_GROUP=spark
    INSTALL_DIR=/home/spark/jobserver
    LOG_DIR=/home/spark/jobserver/log
    PIDFILE=spark-jobserver.pid
    SPARK_HOME=/home/spark/spark
    SPARK_CONF_HOME=/home/spark/spark/conf

    修改project/Dependencies.scala。重新指定spark版本为当前的版本

    lazy val sparkDeps = Seq(
    “org.apache.spark” %% “spark-core” % “1.3.1” ……

    运⾏行bin/server_deploy.sh env(或者直接将env.sh的绝对路径写进server_deploy.sh这样就不用再传参数了)
    打好包后与相关配置⼀一起放到指定服务器的指定目录

    启动:

    需要把config下的local.conf复制到INSTALL_DIR下面,改名为local.conf,并修改其中的master以及两个路径。

    jar-store-rootdir = /var/lib/spark/jars
    rootdir = /var/lib/spark/filedao

    进⼊入服务器指定指定目录,运⾏行server_start.sh

    如果启动有问题可以试试把cfg.sh 拷贝到 spark-job-server目录下 改名为 settings.sh

    创建JobServer工程:

    在idea中新建SBT工程
    在.sbt文件中添加以下内容

    name := "job server demo"
    version := "1.0"
    scalaVersion := "2.10.4"
    resolvers += "Ooyala Bintray" at "http://dl.bintray.com/ooyala/maven"
    libraryDependencies += "ooyala.cnd" % "job-server" % "0.3.1" % "provided"
    libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.0.0"

    继承SparkJob,重写validate与runJob
    validate就是一个执行一系列验证的方法,执行的时候先看一下validate的验证对不对
    runJob执行作业的逻辑

    import com.typesafe.config.{Config, ConfigFactory}
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import scala.util.Try
    import spark.jobserver.SparkJob
    import spark.jobserver.SparkJobValidation
    import spark.jobserver.SparkJobValid
    import spark.jobserver.SparkJobInvalid
    
    object WordCount extends SparkJob{
    def main(args: Array[String]) {
        val sc = new SparkContext("local[4]", "WordCountExample")
        val config = ConfigFactory.parseString("")
        val results = runJob(sc, config)
        println("Result is " + results)
      }
    
      override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
        Try(config.getString("input.string"))
          .map(x => SparkJobValid)
          .getOrElse(SparkJobInvalid("No input.string config param"))
      }
    
      override def runJob(sc: SparkContext, config: Config): Any = {
        val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)
        val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
        rsList(0)._2
      }
    }

    生成jar包并提交

    curl --data-binary @/root/install-pkg/job-server-demo_2.10-1.0.jar localhost:8090/jars/example
    

    测试

    curl -i -d "input.string=a a a b b c" 'localhost:8090/jobs?appName=example&classPath=com.persia.spark.WordCount'
    
    HTTP/1.1 202 Accepted
    Server: spray-can/1.2.0
    Date: Sat, 12 Jul 2014 09:22:26 GMT
    Content-Type: application/json; charset=UTF-8
    Content-Length: 150
    
    {
      "status": "STARTED",
      "result": {
        "jobId": "b5b2e80f-1992-471f-8a5d-44c08c3a9731",
        "context": "6bd9aa29-com.persia.spark.WordCount"
      }
    }

    使用命名RDD:

    object MyNamedRDD extends SparkJob with NamedRDDSuport
    //继承SparkJob并混入NamedRDDSuport特质之后写自己的NamedRDD
    
    this.namedRDDs.update("myrdd",myrdd)
    //以键值对的形式将自定义的命名RDD缓存起来
    
    val myrdd = this.namedRDDs.get[(String,String)]("myrdd").get
    //将缓存的RDD拿出来
    
    //命名RDD可以用于有点类似于Session的作用

    Spark On Tachyon:

    什么是Tachyon?

    来看看传统的Spark不同job之间,不同的框架是如何共享数据的

    这里写图片描述

    这里写图片描述

    通过不断的读取HDFS来实现数据的共享,HDFS是什么?是一种分布式的文件系统啊,说到底就是硬盘。那么问题就很明显了,频繁的磁盘IO操作,还有cache丢失,内存使用等问题

    解决方案是什么?

    就是Tachyon,一种分布式的内存⽂文件系统,注意内存两个字,不同任务(框架)享受可靠快速的数据共享

    于是BDAS变成了下面这种构架:

    这里写图片描述

    之前的问题解决方案:

    这里写图片描述

    Tachyon部署:

    在命令行中下载Tachyon

    wget https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz
    
    tar xvfz tachyon-0.6.4-bin.tar.gz
    
    cd tachyon-0.6.4

    国内的网络访问不了的时候可以在这里下载:

    Tachyon下载地址

    1.Local模式:

    cp conf/tachyon-env.sh.template conf/tachyon-env.sh
    
    ./bin/tachyon format
    
    ./bin/tachyon-start.sh local

    可以通过 http://localhost:19999 WebUI查看

    测试

    ./bin/tachyon runTest Basic CACHE_THROUGH
    
    ./bin/tachyon runTests
    
    ./bin/tachyon-stop.sh

    2.Cluster模式:

    在配置文件目录下修改slaves
    加入各个节点的主机名

    tachyon-env.sh修改如下配置:

    export TACHYON_MASTER_ADDRESS=spark1
    export TACHYON_WORKER_MEMORY_SIZE=4GB
    export TACHYON_UNDERFS_HDFS_IMPL=org.apache.hadoop.hdfs.DistributedFileSystem
    export TACHYON_UNDERFS_ADDRESS=hdfs://spark1:9000

    ./bin/tachyon format
    
    ./bin/tachyon-start.sh

    可以通过 http://tachyon.master.spark1:19999 WebUI查看

    测试

    ./bin/tachyon runTests

    3.基于zookeeper的Master HA

    确保在tachyon-env.sh中设置过
    export TACHYON_UNDERFS_ADDRESS=hdfs://hostname:port

    在TACHYON_JAVA_OPTS中加⼊
    -Dtachyon.master.journal.folder=hdfs://hostname:port/
    tachyon/journal/
    -Dtachyon.usezookeeper=true
    -Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181

    4.Spark On Tachyon:

    在Spark的conf目录下新建core-site.xml,并加入以下内容(zk模式下,如果不是zk模式,将name替换为fs.tachyon.impl即可)

    <configuration>
    <property>
    <name>fs.tachyon-ft.impl</name>
    <value>tachyon.hadoop.TFS</value>
    </property>
    </configuration>

    如果运⾏行的是低于Spark1.0.版本,在spark.env.sh中加⼊入:
    export SPARK_CLASSPATH=/pathToTachyon/client/target/tachyon-client-0.5.0-jar-with-dependencies.jar:$SPARK_CLASSPATH

    在zk模式下,还需要在spark-env.sh中加入
    以下内容:

    export SPARK_JAVA_OPTS="
    -Dtachyon.usezookeeper=true
    -Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181
    $SPARK_JAVA_OPTS
    "

    要在spark程序中使用 Tachyon需指定:
    1、spark.tachyonStore.url
    2、spark.tachyonStore.baseDir

    如果不想每次手动输入以上配置时,可以在spark的conf目录下编辑spark-defaults.conf文件
    将上面两个配置加入去即可

    spark.master                     spark://spark1:7077
    spark.eventLog.enabled           true
    spark.eventLog.dir               hdfs://ns1/spark_event_log
    spark.tachyonStore.url           tachyon://spark1:19998
    spark.tachyonStore.baseDir       /data/tachyon_tmp

    spark程序存储数据要指定OffHeap方式,说明数据不让spark自己管理了,而是让Tachyon接手

  • 相关阅读:
    B. Shift and Push
    Codeforces Round #392 (Div. 2)
    D. Make a Permutation!
    C. Bus
    B. Polycarp and Letters
    A. Fair Game
    python-随机数的产生random模块
    python的时间处理-time模块
    python-迭代器与生成器
    python-装饰器
  • 原文地址:https://www.cnblogs.com/jchubby/p/5449389.html
Copyright © 2011-2022 走看看