zoukankan      html  css  js  c++  java
  • (一)Spark

     

    Spark基础入门

    1、spark的核心概念

    2、spark的四大特性

    3、spark的整体架构

    4、spark的集群安装部署

    5、spark的集群的启动和停止

    6、spark的集群web管理界面

    7、spark-shell

    Driver

      它会执行客户端写好的main方法,它会构建一个名叫SparkContext对象

      该对象是所有spark程序的执行入口

    Application

      就是一个spark的应用程序,它是包含了客户端的代码和任务运行的资源信息

    ClusterManager

      它是给程序提供计算资源的外部服务

      standAlone

      yarn

      mesos

    Master

      它是整个spark集群的老大,,负责任务资源的分配

    Worker

      它是整个spark集群的小弟,负责任务计算的节点

    Executor

      它是一个进程,它会在worker节点启动该进程(计算资源)

    Task

      spark任务是以task线程的方式运行在worker节点对应的executor进程中

     

    ============================================================

    1、RDD弹性分布式数据集的概念

    2、RDD弹性分布式数据集的五大属性

    3、RDD弹性分布式数据集的算子操作分类

    4、RDD弹性分布式数据集的算子操作练习

    1. RDD是什么

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象

    它代表一个不可变、可分区、里面的元素可以并行计算的集合。

    Dataset:就是一个集合、存储很多数据

    Distributed:它内部的元素进行了分布式存储,方便与后期进行分布式计算

    Resilient:表示弹性,rdd的数据是可以保存在内存或者磁盘中

    2. RDD的五大属性

    1)a list of partitions

      一系列的分区

      这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,

      spark中任务是以task线程的方式运行,一个分区就对应一个task线程。

    2) a function for computing each split

      一系列的计算函数作用在分区上

    3)a list of dependencies on other rdds

      一系列的依赖关系

    4)optionally,a partitioner for key-value rdds

      (分区函数,eg:hash-partitioned)

      拿到该元素的key hashcode %总的分区数 =分区号

      还有一个rangePartitioner

      1-100  1号分区

      101-200    2号分区

      201-300 3号分区

    5)optionally,a list of preferred locations 

      最优的数据位置

    RDD的创建

    1、 parallelize

      makeRDD

    2、加载外部的数据源

    3、从已经存在的rdd进行转换生成一个新的rdd

    RDD的算子分类

    1、transformation (转换)

    根据已经存在的rdd转换成一个新的rdd,它是延迟加载,它不会立即执行

    2、action(动作)

    将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中

    transformation算子

    1、map(func)

    2、filter(func)

    3、flatMap(func)

    4、mapPartitions(func)

    5、mapPartitionsWithIndex(func)以分区为单位

    6、union(otherDataset)合并

    7、intersection(otherDataset)求交集

    8、distinct([numTasks])去重

    9、groupByKey([numTasks])

    10、reduceByKey(func,[numTasks])

    11、soryByKey([ascending],[numTasks])

    12、soryBy(func,[ascending],[numTasks])

    13、join(otherDataset,[numTasks])

    14、cogroup(otherDataset,[numTasks])

    15、coalesce(numPartitions)减少RDD的分区数到指定值,合并/减少分区,默认不是Shuff

    16、repartition(numPartitions)重新给RDD分区,有shuffle

    17、repartitionAndSortWithinPartitions(partitioner)

    action算子

    1、reduce(func)

    2、collect()以数组的形式返回数据集的所有元素

    3、count()返回RDD的元素的个数

    4、first()返回RDD的第一个元素

    5、take(n)返回一个由数据集的前n个元素组成的数组

    6、takeOrdered(n,[ordering])返回自然顺序或自定义顺序的前n个元素

    7、saveAsTextFile(path)

    8、saveAsSequenceFile(path)

    9、saveAsObjectFile(path)

    10、countByKey()针对(k,v)类型的RDD,返回一个(k,int)的map,表示每一个key

      对应的元素个数

    11、foreach(func)在数据集的每一个元素上,运行函数func

    12、foreachPartition(func)在数据集的每一个分区上,运行函数func

    RDD算子演示

    spark-2 

    存储到mysql(连接问题)

    存储到hbase(批处理问题)

    ===============================================================

    1、RDD弹性分布式数据集的依赖关系

    2、RDD弹性分布式数据集的lineage血统机制

    3、RDD弹性分布式数据集的缓存机制

    4、spark任务的DAG有向无环图的构建

    5、spark任务如何划分stage

    6、spark任务的提交和调度流程

    1、RDD的依赖关系

    Narrow Dependencies

    Wide Dependencies

    窄依赖:narrow dependency

      窄依赖指的是每一个父RDD的partition最多被子RDD的一个partition使用

      独生子女

      map、flatmap、filter、union

    宽依赖:wide dependency

      宽依赖指的是多个子RDD的partition会依赖同一个父RDD的partition

      超生

      reduceByKey、sortByKey、groupBy、groupByKey、join等

    2 lineage(血统)

    RDD只支持粗粒度转换

      即只记录单个块上执行的单个操作

    将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区

    RDD的Lineage会记录RDD的元数据信息和转换行为,lineage保存了RDD的依赖关系,

    当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

    3 RDD的缓存机制

    3.1 什么是rdd的缓存

    可以把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,

    可以直接从缓存中获取得到,避免了重复计算,缓存是加快后续对该数据的访问操作。

    3.2 如何对rdd设置缓存

    RDD通过persist方法或cache方法可以将前面的计算结果缓存

    但是并不是这两个方法被调用时立刻缓存,而是触发后面的action时,

    该RDD将会被缓存在计算节点的内存中,并供后面重用。

    通过查看源码发现cache最终也是调用而来persist方法,

    默认的存储级别都是仅在内存存储一份

    spark的存储级别还有好多种,存储级别在object StorageLevel中定义的

    3.3 cache和persist区别

      对rdd设置缓存成可以调用rdd的2个方法:一个是cache,一个是persist

    调用上面2个方法都可以对rdd的数据设置缓存,但不是立即就触发缓存执行,后面需要有

    action,才会触发缓存的执行。

    cache方法和persist方法区别:

      cache:默认是把数据缓存在内存中,其本质就是调用persist方法;

      persist:可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别

      都被定义在StorageLevel这个object中

    3.4 什么时候设置缓存

    1、某个rdd的数据后期被使用了多次

    3.5 清除缓存数据 

    1、自动清除

      一个application应用程序结束之后,对应的缓存数据也就自动清除

    2、手动清除

      调用rdd的unpersist方法

    checkpoint(检查点)

    它是提供了一种相对而言更加可靠的数据持久化方式。它是把数据保存在分布式文件系统,

    比如HDFS上,这里就是利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。

    如何设置checkpoint

    在hdfs上设置一个checkpoint目录

    sc.setCheckpointDir("hdfs://node1:9000/checkpoint")

    对需要做checkpoint操作的rdd调用checkpoint方法

    val rdd1=sc.textFile()

    rdd1.checkpoint

    最后需要一个action操作去触发任务的运行

    DAG划分stage

    1 stage是什么

    一个Job会被拆分为多组Task,每组任务都被称为一个stage

    stage表示不同的调度阶段,一个spark job会对应产生很多个stage

    stage类型一共有2种

      ShuffleMapStage

        最后一个shuffle之前的所有变换叫ShuffleMapStage

        它对应的task是shuffleMapTask

      ResultStage

      最后一个shuffle之后的操作叫ResultStage,它是最后一个Stage

      它对应的task是ResultTask

    2 为什么要划分stage

    根据RDD之间依赖关系的不同将DAG划分成不同的stage(调度阶段

    对于窄依赖,partition的转换处理在一个stage中完成计算

    对于窄依赖,由于有shuffle的存在,只能在parent RDD处理完成之后,才能开始接下来的计算

    由于划分完stage之后,在同一个stage中只有窄依赖,没有宽依赖,可以实现流水线计算

    stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task。

    3 如何划分stage

    划分stage的依据是宽依赖

    1)首先根据rdd的算子操作顺序生成DAG有向无环图,接下来从最后一个rdd往前推,创建一个

    新的stage,把该rdd加入到该stage中,它是最后一个stage

    2)在往前推的过程中运行遇到了窄依赖就把该rdd加入到本stage中,如果遇到了宽依赖,就从

    宽依赖切开,那么最后一个stage也就结束了

    3)重新创建一个新的stage,按照第二个步骤继续往前推,一直到最开始的rdd,就结束了

    4 stage与stage之间的关系

      划分完stage之后,每一个stage中有很多可以并行运行的task,后期把每一个stage中的task

    封装在一个taskSet集合中,最后把一个一个的taskSet集合提交到worker节点上的executor进程中运行

    rdd与rdd之间存在依赖关系,stage与stage之前也存在依赖关系,前面stage中的task先运行,

    运行完成了再运行后面stage中的task,也就是说后面stage中的task输入数据是前面stage中task的

    输出结果数据

    Spark的任务调度

    1)Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次

    构建DAGScheduler和TaskScheduler

    2)按照rdd的一系列操作顺序,来生成DAG有向无环图

    3)DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分,每一个stage内部

    有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler

    4)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行

    5)所有task运行完成,整个任务也就结束了

    8 spark的运行架构

    1)Driver端向资源管理器master发送注册和申请计算资源的请求

    2)master通知对应的worker节点启动executor进程(计算资源)

    3)executor进程向Driver端注册并申请task请求

    4)Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建

    DAGScheduler和TaskScheduler

    5)按照客户端代码和rdd的一系列操作顺序,生成DAG有向无环图

    6)DAGScheduler拿到DAG有向无环图之后胡,按照宽依赖进行stage的划分,每一个stage内部

    有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler

    7)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的

    executor进程中运行

    8)所有task运行完成,Driver端向Master发送注销请求,Master通知worker关闭executor

    进程,worker上的计算资源得到释放,最后整个任务也就结束了。

     ***********************you can fuck off the World*****************************************************************************************

  • 相关阅读:
    第一周JAVA基本概念
    Visual Studio 2008 附加进程调试
    .NET错误提示之:ConnectionString尚未初始化
    .NET知识点总结
    JS脚本的基础应用
    错误提示之:无法分析从服务器收到的消息。
    客户端和服务端之间的通信(TCP)
    错误提示之:sqlDateTime 溢出。必须介于 1/1/1753 12:00:00 AM 和 12/31/9999 11:59:59 PM 之间
    错误提示之:DataTable已属于另一个DataSet。
    错误提示之:INSERT 语句与 COLUMN FOREIGN KEY 约束 'FK_CCRM_Service_CCRM_Userlist' 冲突。
  • 原文地址:https://www.cnblogs.com/hanchaoyue/p/13352563.html
Copyright © 2011-2022 走看看