zoukankan      html  css  js  c++  java
  • spark-job提交原理和资源配置

    spark术语
    ---------------
        1.RDD
            弹性分布式数据集 , 轻量级数据集合。
            内部含有5方面属性:
            a.分区列表
            b.计算函数
            c.依赖列表
            e.分区类(KV)
            f.首选位置
            
            创建RDD方式)
            a.textFile
            b.makeRDD/parallelize()
            c.rdd变换
    
        2.Stage
            对RDD链条的划分,按照shuffle动作。
            ShuffleMapStage
            ResultStage
    
            Stage和RDD关联,ResultStage和最后一个RDD关联。**************************
            创建stage时按照shuffle依赖进行创建,shffleDep含有指向父RDD的
            引用,因此之前的阶段就是ShuffleMapStage,而ShuffleMapStage所关联的
            RDD就是本阶段的最后一个RDD。
            总结:
            每个Stage关联的RDD都是最后一个RDD.
    
    
        3.依赖
            Dependency,
            子rdd的每个分区和父RDD的分区集合之间的对应关系。
            创建RDD时创建的依赖。
    
            NarrowDependency
                OneToOne
                Range
                prune
    
            ShuffleDependency
                
        4.分区
            RDD内分区列表,分区对应的数据的切片。
            textFile(,n) ;
            
        5.Task
            任务,具体执行的单元。
            每个分区对应一个任务。
            任务内部含有对应分区和广播变量(串行的rdd和依赖)
            ShuffleMapTask
            ResultTask
    
        6.
    
    
    master的local模式
    -----------------
        local
        local[4]
        local[*]
        local[4,5]
        local-cluster[1,2,3]            //1:N , 2:内核数,3:内存数
        spark://...
    
    Standalone提交job流程
    ---------------------
        首选创建SparkContext,陆续在client创建三个调度框架(dag + task + backend),
        启动task调度器,进而启动后台调度器,由后调度器创建AppClient对象,AppClient
        启动后创建ClientEndpoint,该终端发送"RegisterApplication"消息给master,
        master接受消息后完成应用的注册,回传App注册完成消息给ClientEndPoint,然后master
        开始调度资源,向worker发送启动Driver和startExecutor消息,随后Worker上分别启动Driver
        和执行器,driver也是在Executor进程中运行。
    
    
        执行rdd时,由后台调度器发送消息给DriverEndpoint,driver终端再向executor发送LaunchTask的消息,
        各worker节点上执行器接受命令,通过Executor启动任务。
    
    
        CliendEndpoint                DriverEndpoint
    ---------------------------------------------
        SubmitDriverResponse        StatusUpdate
        KillDriverResponse            ReviveOffers
        RegisteredApplication        KillTask
                                    RegisterExecutor
                                    StopDriver
                                    StopExecutors
                                    RemoveExecutor
                                    RetrieveSparkAppConfig
    
    spark job的部署模式
    --------------------
        [测试代码]
        import java.lang.management.ManagementFactory
        import java.net.InetAddress
    
        import org.apache.spark.rdd.RDD
        import org.apache.spark.{SparkConf, SparkContext}
    
        import scala.tools.nsc.io.Socket
    
        /**
          * Created by Administrator on 2018/5/8.
          */
        object WCAppScala {
    
            def sendInfo(msg: String) = {
                //获取ip
                val ip = InetAddress.getLocalHost.getHostAddress
    
                //得到pid
                val rr = ManagementFactory.getRuntimeMXBean();
                val pid = rr.getName().split("@")(0);//pid
    
                //线程
                val tname = Thread.currentThread().getName
    
                //对象id
                val oid = this.toString;
    
                val sock = new java.net.Socket("s101", 8888)
    
                val out = sock.getOutputStream
    
                val m = ip + "	:" + pid + "	:" + tname + "	:" + oid + "	:" + msg + "
    "
                out.write(m.getBytes)
                out.flush()
                out.close()
            }
    
            def main(args: Array[String]): Unit = {
                //1.创建spark配置对象
                val conf = new SparkConf()
                conf.setAppName("wcApp")
                conf.setMaster("spark://s101:7077")
    
    
    
                sendInfo("before new sc! ") ;
                //2.创建spark上下文件对象
                val sc = new SparkContext(conf)
                sendInfo("after new sc! ") ;
    
                //3.加载文件
    
                val rdd1 = sc.textFile("hdfs://mycluster/user/centos/1.txt" , 3)
    
                sendInfo("load file! ");
                //4.压扁
                val rdd2 = rdd1.flatMap(line=>{
                    sendInfo("flatMap : " + line);
                    line.split(" ")
                })
    
                //5.标1成对
                val rdd3 = rdd2.map(w => {
                    sendInfo("map : " + w)
                    (w, 1)})
    
                //6.化简
                val rdd4 = rdd3.reduceByKey((a,b)=>{
                    sendInfo("reduceByKey() : " + a + "/" + b)
                    a + b
                })
    
                //收集数据
                val arr = rdd4.collect()
                arr.foreach(println)
            }
        }
    
    
        1.client
            driver端运行在client主机上.默认模式。
            spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client
    
        2.cluster
            driver运行在一台worker上。
            上传jar包到hdfs。
            hdfs dfs -put myspark.jar .
    
            spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client
    
    
    
    Spark资源管理
    --------------------
        1.Spark涉及的进程
            Master                //spark守护进程, daemon
            Worker                //spark守护进程, daemon
                                //core + memory,指worker可支配的资源。
    
            Driver                //
            BackExecutor        //
        
        2.配置spark支配资源
            [spark/conf/spark-env.sh]
            # Options read when launching programs locally with
            # ./bin/run-example or ./bin/spark-submit
            # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
            # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
            # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
            # - SPARK_CLASSPATH, default classpath entries to append
    
            # Options read by executors and drivers running inside the cluster
            # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
            # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
            # - SPARK_CLASSPATH, default classpath entries to append
            # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
            # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
    
            # YARN模式读取的选线
            # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
            # - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
            # - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
            # - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
            # - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
    
            ###################################################################
            ####################独立模式的守护进程配置#########################
            ###################################################################
    
            # spark master绑定ip,0000
            # - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
    
            #master rpc端口,默认7077
            # - SPARK_MASTER_PORT 
            
            #master webui端口 ,默认8080
            #SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
            # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
    
            #worker支配的内核数,默认所有可用内核。
            # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
    
            #worker内存, 默认1g
            # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
            # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
    
            #设置每个节点worker进程数,默认1
            # - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
    
            #
            # - SPARK_WORKER_DIR, to set the working directory of worker processes
            # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
    
            #分配给master、worker以及历史服务器本身的内存,默认1g.(最大对空间)
            # - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
            # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
            # - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
            # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
            # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
    
            ###################################################################
            ####################独立模式的守护进程配置#########################
            ###################################################################
            # Generic options for the daemons used in the standalone deploy mode
            # - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
            # - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
            # - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
            # - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
            # - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
            # - SPARK_NO_DAEMONIZE  Run the proposed command in the foreground. It will not output a PID file.
    
        3.job提交时,为job制定资源配置。
            spark-submit 
            
            //设置driver内存数,默认1g
            --driver-memory MEM
    
            //每个执行器内存数,默认1g
            --executor-memory MEM
    
    
            //Only : standalone + cluster,Driver使用的内核总数。
            --driver-cores NUM 
    
            //standalone | mesos , 指定job使用的内核总数
            --total-executor-cores NUM
            
            //standalone | yarn , job每个执行器内核数。
            --executor-cores NUM
    
            //YARN-only , 
            --driver-cores NUM                //驱动器内核总数
            --num-executors NUM                //启动的执行器个数
    
            //内存比较统一
    
            内核配置
    
            集群模式        部署模式        参数
            --------------------------------------------------
            standalone    |    cluster        |    --driver-cores NUM
                        |--------------------------------------
                        |    --total-executor-cores NUM            //总执行器内核
                        |    --executor-cores NUM                //
            ---------------------------------------------------
            yarn        |--executor-cores NUM
                        |--driver-cores NUM
                        |--num-executors NUM
  • 相关阅读:
    聊聊mysql中的int(1)
    如何有效防止sql注入
    微信小程序WXML页面常用语法(讲解+示例)
    Spring Boot 2.x基础教程:使用集中式缓存Redis
    基于.NetCore3.1系列 —— 日志记录之自定义日志组件
    精讲RestTemplate第10篇-使用代理作为跳板发送请求
    使用Java API进行tar.gz文件及文件夹压缩解压缩
    精讲RestTemplate第9篇-如何通过HTTP Basic Auth认证
    精讲RestTemplate第8篇-请求失败自动重试机制
    精讲RestTemplate第7篇-自定义请求失败异常处理
  • 原文地址:https://www.cnblogs.com/zyde/p/9044365.html
Copyright © 2011-2022 走看看