zoukankan      html  css  js  c++  java
  • Spark相关总结(1)

    1.大数据处理框架

    1.1 四层结构

    大数据处理框架一般可以大致分为四层结构:

    • 用户层
    • 分布式数据并行处理层
    • 资源管理与任务调度层
    • 物理执行层

    下面分别就四层进行详细解释。

    1.1.1 用户层

    主要包括:

    • 输入数据:考虑数据如何高效读取(减少磁盘I/O)、批式和流式不同的读取方式等
    • 用户代码
    • 配置参数:分为资源相关的配置参数和数据相关的配置参数

    1.1.2 分布式数据并行处理层

    将用户提交的任务转换成较小的计算任务,然后通过调度层实现并行执行。

    Spark中应用转换过程包含两个阶段:

    • 逻辑处理流程:构建DAG,其中的节点是RDD
    • 执行阶段和执行任务划分:对逻辑处理流程进行划分,生成物理执行计划,包含多个stage(根据RDD之间的依赖关系划分),每个stage包含多个task(一般与分区数一致)

    1.1.3 资源管理和任务调度层

    大数据框架一般都是Master-Slave结构,主节点负责接收用户的应用提交,处理请求,管理整个应用的生命周期。从节点主要负责任务的执行,即具体的数据处理,同时执行过程中向主节点汇报任务的执行状态。

    典型的任务调度器包括FIFO(包括应用调度器和任务调度器,分别决定多个应用和多个任务的执行顺序)调取器和Fair调度器。

    1.1.4 物理执行层

    负责启动task,执行每个task的数据处理步骤。在MR中,每个task对应一个进程,而在Spark中,每个task对应JVM中的一个线程,task共享JVM内存空间。

    2.Spark基本架构

    2.1 安装部署

    Spark的部署方式主要有以下几种:

    • Standalone
    • Mesos
    • YARN
    • Kubernetes

    2.2 系统架构

    • Master节点:常驻Master进程,负责管理应用和任务,收集任务运行信息,监控Worker节点存活状态等
    • Worker节点:常驻Worker进程,负责执行任务,与Master节点通信,每个Worker进程上存在一个或多个ExecutorRunner对象,每个ExecutorRunner对象管理一个Executor,Executor持有一个线程池,每个线程执行一个task
    • Appcation:一个可运行的Spark程序
    • Driver:运行Spark应用中main()函数并且创建SparkContext的进程独立于Master进程
    • Executor:Spark计算资源的一个单位,是一个JVM进程,Spark以Executor为单位占用集群资源,然后将具体的计算任务分配给Executor执行(Worker进程只负责启停和观察Executor的执行情况)
    • task:Spark的计算任务,Driver运行Spark应用的main()函数时,将应用拆分为多个计算任务,分配给多个Executor执行。task是Spark中最小的计算单位,以线程方式运行在Executor进程中,Executor的内存空间由多个task共享

    3.Spark逻辑处理流程

    Spark在运行应用程序前,首先需要将应用程序转化为逻辑处理流程(Logical Plan),主要包含四部分:

    1. 数据源:包括本地文件系统和分布式文件系统,还可以是内存数据结构、网络流等
    2. 数据模型:Spark将输入/输出、中间数据抽象为统一的数据模型RDD,其与普通的数据结构主要区别有两点,一是RDD只是一个逻辑概念,在内存中并不会为其分配存储空间,除非该RDD需要被缓存,二是RDD可以包含多个数据分区,不同数据分区可以由不同的task在不同的节点进行处理
    3. 数据操作:Spark中将数据操作主要分为transformation()和action()操作,action操作触发Spark提交job真正执行任务
    4. 计算结果处理:分为两种方式,一种是直接将计算结果存放到分布式文件系统中,这种方式一般不需要在Driver端进行集中计算,另一种方式是需要在Driver端进行集中计算

    3.1 Spark逻辑处理流程生成

    Spark需要有一套通用的方法来将应用程序转化为确定性的逻辑处理流程,需要考虑三个问题。

    3.1.1 如何产生RDD以及产生什么样的RDD

    需要多种类型的RDD来表示不同数据类型、不同计算逻辑以及不同数据以来的RDD,Spark实际产生的RDD类型和个数与transformation()的计算逻辑有关

    3.1.2 如何建立RDD之间的数据依赖关系

    包含两方面:RDD之间的依赖关系和RDD自身分区之间的依赖关系

    主要包括三个问题:

    1. RDD之间的依赖关系如何建立:对于一元操作,子RDD只依赖父RDD,二元操作,子RDD同时依赖多个父RDD,二元以上的类比
    2. 新生成的RDD应该包含多少分区:分区数由用户和父RDD共同决定,用户可以指定分区个数,如果不指定,取父RDD的分区个数最大值
    3. 分区之间的依赖关系:分为窄依赖和宽依赖

    窄依赖

    • 一对一:map()、filter()
    • 范围依赖:union()
    • 多对一:join()、cogroup()
    • 多对多:cartesian()

    宽依赖

    表示新生成的子RDD中的分区依赖父RDD中的每个分区的一部分,和窄依赖的区别在于:子RDD的各个分区是否完全依赖父RDD的一个或多个分区,如果父RDD中的一个分区中的数据全部流入子RDD的一个或者多个分区,则是窄依赖,如果一部分流入RDD的一个分区,一部分流入另一个分区,则是宽依赖

    3.1.3 如何计算RDD中的数据

    确定数据依赖关系之后,使用transformation(func)处理每个分区的输入数据,将生成的数据再推送到子RDD中对应的分区即可

    3.2 常用的transformation算子

    • map:对每条记录进行处理,输出一条新的record
    • mapValues:对k,v类型的record,对其value进行处理,输出新的record
    • filter:对record进行过滤,保留判断结果为true的record
    • filterByRange:保留lower和upper之间的record(判断key)
    • flatMap:对每个元素进行操作,得到新元素,然后将所有元素组合
    • flatMapValues:只针对value进行操作
    • sample:对数据进行抽样
    • mapPartitions:对每个分区进行操作,输出一组新的数据(该操作可以用来实现数据库操作)
    • mapPartitionsWithIndex:分区中的数据带有索引,表示record属于哪个分区
    • partitionBy:使用新的分区器进行重新分区
    • groupByKey:将<k,v>结构按照key聚合在一起,形成<k,CompactBuffer(v)>结构(会进行shuffle操作
    • reduceByKey:与groupByKey类似,包括两步聚合,首先对RDD中每个分区的数据进行一个本地化的combine,然后执行reduce操作,不形成新的RDD,然后生成新的ShuffledRDD,将RDD1中不同分区且具有相同key的数据聚合在一起,再进行一次reduce操作,该算子只能对record一个一个连续处理,中间计算结构必须和value是同一类型,效率相比groupByKey更高
    • aggregateByKey:由于reduceByKey算子灵活性较低,所以需要定义一个通用的聚合算子
    • combineByKey:该算子的createCombiner是一个初始化函数,相比aggregateByKey包含的zeroValue是一个值,功能更加强大
    • foldByKey:简化的aggregateByKey,seqOp和combineOp共用一个func,功能介于aggregateByKey和reduceByKey之间
    • coGroup:将多个RDD中具有相同key的value聚合在一起
    • join:将两个RDD中的数据关联在一起
    • cartesian:计算两个RDD的笛卡尔积
    • sortByKey:对RDD<k,v>中record按照key进行排序(如何对value进行排序?二次排序或者先使用groupByKey将数据聚合,再使用mapValues对value进行排序)
    • coalesce:改变RDD的分区数,可以选择true或false决定是否进行shuffle
    • repartition:相当于参数为true的coalesce
    • repartitionAndSortWithinPartitions:类似于repartition,可以使用各种分区器,对RDD2中的每个分区按照key进行排序,比repartiiton+sortByKey效率高
    • intersection:求两个RDD的交集
    • distinct:去重
    • union:合并两个RDD中的元素
    • zip:将两个RDD中的元素一一对应连接成<k,v>,要求两个RDD分区数相同,每个分区包含元素个数相等
    • zipPartitions:将两个RDD中的分区按照一一对应连接,要求分区个数相同,但不要求每个分区包含的元素个数相同
    • zipWithIndex:对RDD中的数据进行编号,从0开始按序递增
    • zipWithUniqueId:对RDD中的数据进行编号,round-robin方式
    • subtractByKey:计算key在RDD1中但是不在RDD2中的record
    • subtract:计算在RDD1中但是不在RDD2中的record,适用面更广,可以针对非<k,v>类型的RDD
    • sortBy:基于func计算结果对RDD中的record进行排序
    • glom:将RDD中每个分区的record合并到一个list中

    3.3 常用的action算子

    • count:计算RDD中的record个数
    • countByKey:计算RDD中每个key出现的次数,RDD需要是<k,v>类型,返回Map
    • countByValue:计算RDD中每个record出现的次数,返回Map
    • collect:将RDD中的record收集到Driver端
    • collectAsMap:将RDD中的<k,v>record收集到Driver端,得到<k,v>Map
    • foreach:将RDD中的每个record按照func进行处理
    • foreachPartition:将RDD中的每个分区中的数据按照func进行处理
    • fold:语义与foldByKey一样,区别是foldByKey生成一个新的RDD,而fold直接计算出结果
    • reduce:语义与reduceByKey一样,区别同上
    • aggregate:语义与aggregateByKey一样,区别同上,需要这几个操作的原因是,我们需要全局聚合,而前面的操作只能对每个分区,以及跨分区且具有相同key的record进行聚合,不能对所有record进行全局聚合,存在的问题是Driver单点merge,存在效率和空间限制问题,优化为treeAggregate和treeReduce
    • treeAggregate:使用树形聚合方法,减轻Driver端聚合压力,采用flodByKey来实现非根节点的聚合,使用fold来实现根节点的聚合
    • treeReduce:类似
    • reduceByKeyLocality:和reduceByKey不同在于首先在本地进行局部reduce,然后将数据汇总到Driver端进行全局Reduce,返回结果保存在HashMap中而不是RDD中
    • take:取RDD中前n个record
    • first:取RDD中第一个record
    • takeOrdered:取RDD中最小的n个record
    • top:取RDD中最大的n个record
    • max:计算RDD中record的最大值
    • min:计算RDD中record的最小值
    • isEmpty:判断RDD是否为空
    • lookup:找出RDD中包含特定key的value,组成list
    • saveAsTextFile:将RDD保存为文本文件
    • saveAsObjectFile:将RDD保存为序列化对象形式的文件
    • saveAsSequenceFile:将RDD保存为SequenceFile形式的文件
    • saveAsHadoopFile:将RDD保存为HDFS文件系统支持的文件

    4.Spark物理执行计划

    4.1 执行步骤

    1. 根据action操作将应用划分为job
    2. 根据每个job中的ShuffleDependency依赖关系,将job划分为stage
    3. 在每个stage中根据最后生成的RDD的分区个数生成多个task

    5.总结及相关问题

    这篇文档主要总结了大数据处理框架的通用结构,Spark的基本架构以及Spark的逻辑处理流程和物理执行流程,后面将对具体细节进行总结。

    问题1:Spark为什么以线程方式运行task而不是进程方式?

    用进程运行任务的好处是task之间相互独立,每个task独享资源,不会相互干扰,但是坏处是task之间不方便共享数据,会造成重复加载等问题,同时进程的启动和停止需要做很多工作,会降低执行效率。Spark采用线程为最小执行单位,好处是数据可以共享,并且提高执行效率,但是缺点是线程间会有资源竞争。

    问题2:Spark中为什么要拆分为执行阶段?

    主要有三个好处:

    1. 将job拆分为stage,使得stage中生成的task不会太大也不会太小,并且是同构的,便于并行执行
    2. 可以将多个操作放在一个task里面执行,进行串行、流水式处理,提高数据处理效率
    3. stage可以方便容忍错误,如果一个stage失效,重新运行该stage即可,不需要运行整个job

    问题3:对数据依赖进行分类有什么好处?

    1. 明确RDD分区之间的数据依赖关系
    2. 有利于生成物理执行计划
    3. 有利于代码实现
  • 相关阅读:
    在不打开excel的情况下用python执行excel
    Python中xlrd、xlwt、win32com模块对xls文件的读写操作
    [已解决]报错:have mixed types. Specify dtype option on import or set low_memory=False
    [已解决]报错:xlrd.compdoc.CompDocError: Workbook: size exceeds expected 17920 bytes; corrupt?
    使用Pandas读取大型Excel文件
    [转]jmeter实战
    webService(SOAP)性能测试脚本
    jmeter正则表达式提取器--关联
    Data Set Config配置元件
    压力测试涉及到的参数
  • 原文地址:https://www.cnblogs.com/jordan95225/p/15150508.html
Copyright © 2011-2022 走看看