zoukankan      html  css  js  c++  java
  • Spark 分布式计算原理

    Spark 分布式计算原理

    Spark Shuffle

    1)在数据之间重新分配数据
    2)(将父RDD重新定义进入子RDD)每一个分区里面的数据要重新进入新的分区
    3)每一个shuffle阶段尽量保存在内存里面,如果保存不下到磁盘
    4)在每个shuffle阶段不会改变分区的数量
    

    RDD的依赖关系-1(lineage)

    1) 宽依赖:一个夫RDD的分区被子RDD的多个分区使用
    发生宽依赖一定shuffle()
    (相当于超生)
    2) 窄依赖:一个父RDD的分区被子RDD的一个分区使用
    

    RDD的依赖关系-2(lineage)?? 宽依赖对比窄依赖

    DAG工作原理

    • 根据RDD之间的依赖关系,形成一个DAG(有向无环)
    1)从后往前,遇到宽依赖切割为新的Stage
    2)每个Stage由分区一组并行的Task组成
    每个Task共享归类内存,堆外内存Task数据在进行交换
    提前聚合,避免shuffle,将数据先进行去重
    

    RDD持久化-1

    cache:

    • 间数据写入缓存
    • cache()不能再有其他的算子
    val rdd=sc.makeRDD(1 to 10)
    val rdd2=rdd.map(x=>{println(x);x}
    rdd2.cache
    rdd2.collect
    

    RDD共享变量-1

    • 广播变量(要定义的是Array)
    val rdd=sc.makeRDD(1 to 10)
    val j=sc.broadcast(Array(0))
    rdd.map(x=>{j.value(0)=j.value(0)+1;println(j.value(0));x}).collect
    

    RDD共享变量-2

    • 累加器:只允许added操作,常用于实现计数
    val accum = sc.accumulator(0,"My Accumulator")
    sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
    accum.value
    

    RDD分区设计

    • 分区大小限制为2GB
    分区太小
    1)分区承担的责任越大,内存压力越大
    分区过多
    1)shuffle开销越大
    2)创建任务开销越大
    

    装载CSV数据源

    方法一:使用SparkContext
    System.setProperty("hadoop.home.dir", "D:\hadoop-2.7.3")//防止hadoop报错
      val conf=new SparkConf()
        .setMaster("local[2]")
        .setAppName("hello")
        val sc=SparkContext.getOrCreate(conf)
    val lines = sc.textFile("file:///d:/users.csv")
    val fields = lines.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).map(l => l.split(",")).foreach(x=>println(x.toList))
    ————————————————————————————————————————————————————————————————————————————————————————————————
    方法二:使用SparkSession
    val df = spark.read.format("csv").option("header", "true").load("file:///home/kgc/data/users.csv")
    

    装载JSON数据源

    方法一:使用SparkContext
    val lines = sc.textFile("file:///home/kgc/data/users.json")
    //scala内置的JSON库
    import scala.util.parsing.json.JSON
    val result=lines.map(l=>JSON.parseFull(l))
    ————————————————————————————————————————————————————————————————————————————————————————————————
    还有一种使用SparkSession方法:API
    var spark=SparkSession.builder().master("local[2]")
          .appName("hello").getOrCreate();
        val rdd=spark.read.json("file:///d:/date.json")
        print(rdd)
    ————————————————————————————————————————————————————————————————————————————————————————————————
    方法二:使用SparkSession
    val df = spark.read.format("json").option("header", "true").load("file:///home/kgc/data/users.json")
    

    RDD数据倾斜*

    • 数据分配的不均匀
    • 通常发生在groupBy,join等之后
    1)在执行shuffle操作的时候,是按照key,来进行values的输出、拉取和聚合2)同一个key的values,一定是分配到一个reduce task进行处理的
    3)如果是很多相同的key对应的values被分配到了一个task上面去执行,而另外的task,可能只分配了一些
    4)这样就会出现数据倾斜问题
    

    解决方法:

    方案一:聚合源数据
    通过一些聚合的操作,比如grouByKey、reduceByKey就是拿到每个key对应的value,对每个key对应的values执行一定的计算
    方案二:过滤导致倾斜的key
    在sql中用where条件,过滤某几个会导致数据倾斜的key
    

    ——————————————————————————————————————————————————

    为什么要划分Stage

    spark划分stage思路:
    1)从后往前推,一个job拆分为多组task,每组的任务被称为一个stage
    2)stage里面的Task的数量对应一个partition,而stage又分为两类,
    一类是shuffleMapTask,一类是resltTask,DAG的最后一个阶段为每个partition生成一个resultask,
    其余阶段都会生成ShuffleMapTask,他将自己的计算结果通过shuffle传到下一个stage中。

    ——————————————————————————————————————————————————

  • 相关阅读:
    京东书4
    哈工大信息检索研究室 语言技术平台相关技术简介
    VIM使用小技巧重新载入文件
    在亚马逊网站上查看此物品
    automake autoconf m4 suite for autotools download
    XZ压缩最新压缩率之王
    在亚马逊网站上查看此物品
    京东书3
    欢迎访问 Babel 汉英平行语料库
    autotools 使用实例ckelselChinaUnix博客
  • 原文地址:https://www.cnblogs.com/tudousiya/p/11285866.html
Copyright © 2011-2022 走看看