zoukankan      html  css  js  c++  java
  • spark

        /*
         * spark算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
         * spark算子的作用:
         * 1.输入:在spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入scala集合或数据)输入spark,
         *        数据进入spark运行时数据空间,转化为spark中的数据块,通过blockmanager进行管理
         * 2.运行:在spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过action算子,触发spark提交作业。
         *        如果数据需要复用,可以通过cache算子,将数据缓存到内存
         * 3.输出:程序运行结束数据会输出spark运行时控件,存储到分布式存储中(如savaAsTextFile输出到HDFS),
         *        或scala数据或集合中(collect输出到scala集合,count返回scala int类型)
         *        
         * spark算子大致上可以分三大类算子:
         * 1.value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是value型的数据
         *     map(func)  将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。
         *     flatMap(func)  类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。
         *     mapPartitions(func)  类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。
         *     union(otherDataset)  返回原数据集和参数指定的数据集合并后的数据集。使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用distinct()。
         *     cartesian(otherDataset)  对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个RDD内的所有元素进行笛卡尔积操作。
         *     groupBy : 根据自定义的东东进行分组。groupBy是基本RDD就有的操作。
         *     groupByKey([numTasks])   操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。
                       注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高。
                       注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数。
         *     filter(func)  使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。
         *     distinct([numTasks]))  将RDD中的元素进行去重操作。
         *     subtract  返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle
         *     sample(withReplacement, fraction, seed)   对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。
         *     takeSample(withReplacement,num, [seed])  对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。
         *     cache, persist  持久化
         * 2.key-value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是key-value型的数据
         *     mapValues  对各个键值对的值进行映射。该操作会保留RDD的分区信息。
         *     combineByKey 返回与输入数据的类型不同的返回值
         *     reduceByKey  与reduce相当类似,它们都接收一个函数,并使用该函数对值进行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。
         *     partitionBy 对RDD进行分区,可以减少大量的shuffle.
         *     cogroup  可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。
         *     join   对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。
         *     leftOutJoin  即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。
         *     rightOutJoin  即右外连接,与左外连接相反。
         *     fullOuterJoin  即全外连接,它是是左右外连接的并集。
         * 3.action算子,这类算子会触发sparkcontext提交作业
         *     foreach  对每个元素进行操作,并不会返回结果。
         *     foreachPartition : 基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。
         *     saveAsTextFile 用于将RDD写入文本文件。spark会将传入该函数的路径参数作为目录对待,默认情况下会在对应目录输出多个文件,这取决于并行度。
         *     saveAsObjectFile saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
         *     collect  收集RDD的元素到driver节点,如果数据有序,那么collect得到的数据也会是有序的。大数据量最好不要使用RDD的collect,因为它会在本机上生成一个新的Array,以存储来自各个节点的所有数据,此时更好的办法是将数据存储在HDFS等分布式持久化层上。
         *     collectAsMap   将结果以Map的形式返回,以便查询。
         *     reduceByKeyLocally 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
         *     lookup   返回给定键对应的所有值。
         *     count  返回RDD元素个数。
         *     top   如果为元素定义了顺序,就可以使用top返回前几个元素。
         *     reduce   对RDD中所有元素进行规约,最终得到一个规约结果。reduce接收的规约函数要求其返回值类型与RDD中元素类型相同。
         *     fold  与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold同样要求规约函数返回值类型与RDD元素类型相同。
         *     aggregate 与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。
         *     
         */
        
        /*
         * spark运行架构
         * 术语:
         *    application:spark application的概念和hadoop mapreduce中的类似,指的是用户编写的spark应用程序,
         *                 包含了一个driver功能的代码和分布在集群中多个节点上运行的executor代码。
         *    driver:spark中的driver即运行上述application的main()函数并且创建sparkcontext,其中创建sparkcontext的目的是为了准备spark应用程序的运行环境。
         *            在spark中由sparkcontext负责和clustermanager通信,进行资源的申请,任务的分配和监控等;当executor部分运行完毕后,driver负责将sparkcontext关闭。
         *            通常用sparkcontext代表driver。
         *    executor:application运行在worker节点上的一个进程,该进程负责运行task,并且负责将数据存在内存或者磁盘上,每个application都有各自独立的一批executor。
         *             在spark on yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于hadoop mapreduce中的YarnChild。
         *             一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取一个空闲的线程运行Task。
         *             每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了
         *    Cluster Manager:指的是在集群上获取资源的外部服务,目前有:
         *                    Standalone:spark原生的资源管理,由Master负责资源的分配
         *                    Hadoop Yarn:由Yarn中的ResourceManager负责资源分分配
         *    Worker:集群中任何可以运行application代码的节点,类似于yarn中NodeManager节点。
         *           在Standalone模式中指的就是通过Slave文件配置的Worker节点,在spark on yarn模式中指的就是NodeManager节点
         *   作业(Job):包含多个Task组成的并行计算,往往由spark action催生,一个Job包含多个RDD及作用于相应RDD上的各种Operation。
         *   阶段(Stage):每个Job会被拆分很多组Task,每组任务被称为Stage,也可以成TaskSet,一个作业分多个阶段。
         *   任务(Task):被送到某个executor上的工作任务。
         */
        
        
        /*
         * spark运行基本流程
         * 1.构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone, Mesos或YARN)注册并申请运行Executor资源。
         * 2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上。
         * 3.SparkContext构建成DAG图,将DAG图分解成Stage,并把TaskSet发送给TaskScheduler。
         *   Executor向SparkContext申请Task,Task Schedule将Task发放到Executor运行同时SparkContext将应用程序代码发放给Executor。
         * 4.Task在Executor上运行,运行完毕释放所有资源。
         * 
         * spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式。
         * 部署在单台机器上时,既可以用本地(local)模式运行,也可以使用伪分布式模式来运行;
         * 当以分布式集群部署的时候,可以根据自己集群的实际情况选择Standalone模式(Spark自带的模式), YARN-Client模式或者YARN-Cluster模式。
         * Spark的各种运行模式虽然在启动方式,运行位置,调度策略上各有不同,但它们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要运行和管理Task。
         * Standalone模式:是Spark实现的资源调度框架,其主要的节点有Client节点,Master节点和Worker节点。
         *               其中Driver既可以运行的Master节点上的,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;
         *               当使用spark—submit工具提交Job或者在Eclipse,IDEA等开发平台上使用"new SparkConf.setManager("spark://master:7077")"方式运行Spark任务时,Driver是运行在本地Client端上的。
         *               运行过程如下:
         *                  1.SparkContext连接Master,向Master注册并申请资源(CPU Core和Memory);
         *                  2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上提取资源,然后启动StandaloneExecutorBackend;
         *                  3.StandaloneExecutorBackend向SparkContext注册;
         *                  4.SparkContext将APPlication代码发送给StandaloneExecutorBackend;
         *                    并且SparkContext解析Application代码,构建DAG图,并提交给DAG Schedule分解成Stage(当碰到Action操作时,就会催生Job,每个Job中含有一个或多个Stage, Stage一般在获取外部数据和shuffle之前产生),
         *                    然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Schedule负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
         *                  5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,知道Task完成;
         *                  6.所有Task完成后,SparkContext向Master注销,释放资源;
         * Spark on YARN运行过程:YARN是一种统一资源管理机制,在其上面可以运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算,由于历史原因火车单方面业务的性能考虑二使用着其他的计算框架,
         *                     比如MapReduce,Storm等计算框架。Spark基于此种情况开发了Spark on YARN的运行模式,由于借助了YARN良好的弹性资源管理机制,不仅部署Application更加方便,
         *                     而且用户在YARN集群中运行的服务和Application的资源可完全隔离,更具实践应用价值的是YARN可以通过队列的方式,管理同时运行在集群中的多个服务。
         *                     Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。
         *                     YARN框架流程:任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节之前,有必要先分析一下YARN框架的一些基本原理。
         *                     YARN框架的基本运行流程图为:ResourceManager负责将集群的资源分配给各个应用使用,而资源分配和调度的基本单位是Container,其中封装了机器资源,
         *                                            如内存、CPU、磁盘和网络等,每个任务会被分配一个Container,该任务只能在该Container中执行,并使用该Container封装的资源。
         *                                            NodeManager是一个个的计算节点,主要负责启动Application所需的Container,监控资源(内存、CPU、磁盘和网络等)的使用情况并将之汇报给ResourceManager。
         *                                            ResourceManager与NodeManager共同组成整个数据计算框架,ApplicationMaster与具体的Application相关,主要负责同ResourceManager协商以获取合适的Container,
         *                                            并跟踪这些Container的状态和监控其进度。
         *                     Yarn-Client:Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,       
         *                                  默认是http://hadoop1:4040访问,而YARN通过http://hadoop1:8088访问。
         *                                  Yarn-Client工作流:
         *                                       1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master;
         *                                         同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
         *                                       2.ResourceManager收到请求后,在集群上选择一个NodeManager,为该应用程序分配第一个Container,要求她在这个Container中启动应用程序的ApplicationMaster,
         *                                         与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分配;
         *                                       3.Client中SparkContext初始化完毕后,与Application建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
         *                                       4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通讯,要求它在获得的Container中启动CoarseGrainedExecutorBackend,
         *                                         CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
         *                                       5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,
         *                                         以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
         *                                       6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注册并关闭自己;
         *                     Yarn-Cluster:在Yarn-Cluster模式中,当用户向Yarn中提交一个应用程序后,
         *                                   Yarn将分两个阶段运行该应用程序:
         *                                      1.把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动
         *                                      2.由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成
         *                                   Yarn-Cluster工作流程:
         *                                      1.Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序,启动ApplicationMaster的命令,需要在Executor中运行的程序等;
         *                                      2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动ApplicationMaster,
         *                                        其中ApplicationMaster进行SparkContext等的初始化;
         *                                      3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,
         *                                        然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态知道运行结束;
         *                                      4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,
         *                                        CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。
         *                                        这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedExecutorBackend配合YarnClusterScheduler进行任务的调度,
         *                                        其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
         *                                      5.ApplicationMaster中对SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,
         *                                        以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
         *                                      6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注册并关闭自己;
         *                     Yarn-Client和Yarn-Cluster的区别:理解Yarn-Client和Yarn-Cluster深层次的区别之前先清楚一个概念:ApplicationMaster。
         *                                                    在Yarn中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。
         *                                                    它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。
         *                                                    从深层次的含义讲Yarn-Cluster和Yarn-Client模式的区别其实就是ApplicationMaster进程的区别。
         *                                                    Yarn-Cluster模式下,Driver运行在ApplicationMaster中,负责向Yarn申请资源,并监督作业的运行状况。
         *                                                                      当用户提交作业之后,就可以关掉Client,作业会继续在Yarn上运行,因而Yarn-Cluster模式下不适合运行交互类型的作业。
         *                                                    Yarn-Client模式下,ApplicationMaster仅仅向Yarn请求Executor,Client会和请求的Container通信来调度他们的工作,也就是说Client不能离开。
         *                     
         */
        
  • 相关阅读:
    Leetcode Binary Tree Level Order Traversal
    Leetcode Symmetric Tree
    Leetcode Same Tree
    Leetcode Unique Paths
    Leetcode Populating Next Right Pointers in Each Node
    Leetcode Maximum Depth of Binary Tree
    Leetcode Minimum Path Sum
    Leetcode Merge Two Sorted Lists
    Leetcode Climbing Stairs
    Leetcode Triangle
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6365897.html
Copyright © 2011-2022 走看看