zoukankan      html  css  js  c++  java
  • Spark笔记02

    • 介绍了Spark中最重要的概念RDD
      • 介绍了RDD的基本操作(Transformation & Action)
      • 介绍了RDD的血缘关系(Lineage)
      • 介绍了RDD的依赖类型(Narrow & Shuffle)
      • 介绍了RDD的阶段(Stage)
      • 介绍了RDD的缓存(Cache)
    • 实战:Loading Data
    • 实战:Hello World with Scala

    Spark Core - RDD

    Resilient Distributed Dataset 弹性分布式数据集

    ...collection of elements partitioned across the nodes of ther cluster that can be operated on in parallel...

    RDD是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。它有以下特性:

    • 只读:不能修改,只能通过转换操作生成新的 RDD。
    • 分布式:可以分布在多台机器上进行并行处理。
    • 弹性:计算过程中内存不够时它会和磁盘进行数据交换。
    • 基于内存:可以全部或部分缓存在内存中,在多次计算间重用。

    理解:RDD是Spark中非常重要的一个抽象概念。我们可以这样理解:一个List,里面有许多Int类型数据,比如1000万条,现在要对其求平均值。于是我们把这个List分割成1000份,分别计算求和,然后再累加起来求平均。这个List就是RDD。
    运用于之后的计算。

    通过使用RDD,用户不必担心底层数据的分布式特性,只需要将具体的应用逻辑表达为一系列转换处理,就可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 I/O 和数据序列化的开销。

    RDD - 基本操作

    其实,RDD的操作和Java Stream API很像,后者也是类似地用pipeline进行map-reduce操作。如下:

    someList.stream()
    .filter(...)      -- map
    .collect(...);    -- reduce
    

    相应地,RDD也有类似的操作,称为Transformation和Action。

    Transformation

    • map
    • filter
    • ...

    转化操作,就是从一个RDD产生一个新的RDD,但是这里不计算。

    Action

    • collect
    • count
    • reduce
    • ...

    行动操作,就是进行实际的计算。

    RDD - 血缘关系

    RDD的最重要的特性之一就是血缘关系(Lineage),它描述了一个RDD是如何从父RDD计算得来的。如果某个RDD丢失了,则可以根据血缘关系,从父RDD计算得来。

    把这种关系画出来,就是DAG(direct acyclic graph),有向无环图。

    spark_dag

    RDD - 依赖类型

    • 窄依赖 Narrow:父RDD的每个分区都只被子RDD的一个分区所使用
    • 宽依赖 Shuffle:父RDD的每个分区都被多个子RDD的分区所依赖。
    spark_dependency

    当处理数据时,

    • 窄依赖 支持在同一个集群Executor上,以pipeline管道形式顺序执行多条命令,例如在执行了map后,紧接着执行filter
    • 宽依赖 必须等RDD的parent partition数据全部ready之后才能开始计算

    当需要失败恢复时

    • 窄依赖 更为高效,它只需要根据父RDD分区重新计算丢失的分区即可
    • 宽依赖 开销较大,需要重新计算父RDD的所有分区

    从操作上来分

    • map, filter, union等操作是窄依赖
    • groupByKey, reduceByKey等操作是宽依赖

    RDD - 阶段

    对于宽依赖,spark的设计是让parent RDD将结果写在本地,完全写完之后,通知后面的RDD。后面的RDD则首先去读之前的本地数据作为input,然后进行运算。

    分为两个阶段(stage)去做:

    • 第一个阶段(stage)需要把结果shuffle到本地,例如reduceByKey,首先要聚合某个key的所有记录,才能进行下一步的reduce计算,这个汇聚的过程就是shuffle
    • 第二个阶段(stage)则读入数据进行处理。

    同一个stage里面的task是可以并发执行的,下一个stage要等前一个stage ready

    RDD - 缓存

    Spark RDD是惰性求值的,而有时候希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD及它的依赖,这样就会带来太大的消耗。为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。
    Spark可以使用persist和cache方法将任意RDD缓存到内存、磁盘文件系统中。

    Exercise - Loading Data

    有三种常见操作可以创建RDD,列举如下:

    1.使用集合

    这里主要使用了parallelizemakeRDD

    scala> val arr = Array(1,2,3)
    
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd = sc.makeRDD(arr)
    
    scala> rdd.collect
    
    res0: Array[Int] = Array(1, 2, 3)
    

    另外,parallelize还支持并行操作,通过传入参数,将dataset切分成n个partition,比如,sc.parallelize(data, 10))

    2.读取外部文件系统

    这里主要使用了textFile

    scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/dummy.txt")
    
    scala> val rdd2 = sc.textFile("file:////root/dummy.txt")
    scala> rdd2.collect
    res2: Array[String] = Array(a, b, c)
    

    3.从父RDD转换成新的子RDD

    这里主要使用了transformation类的方法/算子,如map, fliter等。

    scala> rdd.collect
    res3: Array[Int] = Array(1, 2, 3)
    
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd2 = rdd.map(_*100)
    scala> rdd2.collect
    res4: Array[Int] = Array(100, 200, 300)
    

    注:

    • 调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定。
    • 调用action类的算子,不会生成新的RDD。

    Exercise - Spark with Scala

    之前我们已经试过了在spark-shell里面使用命令行执行一些操作,下面试一下用project的形式运行。

    主要需要准备两个文件,一是app文件。

    import org.apache.spark.sql.SparkSession
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val spark = SparkSession.builder.appName("Simple Application").config("spark.master", "local").getOrCreate()
        val logData = spark.read.textFile(logFile).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println(s"Lines with a: $numAs, Lines with b: $numBs")
        spark.stop()
      }
    }
    

    二是build文件。这里用了sbt

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.12.10"
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
    

    当然也可以用maven,常用的dependency如下:

    groupId = org.apache.spark
    artifactId = spark-core_2.11
    version = 2.1.1
    
    groupId = org.apache.hadoop
    artifactId = hadoop-client
    version = <your-hdfs-version>
    

    然后,就可以package + run。

    # Package a jar containing your application
    $ sbt package
    ...
    [info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
    
    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/scala-2.12/simple-project_2.12-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23
    

    Reference

  • 相关阅读:
    ios UIImageView
    ios UILable
    [leetCode]116. 填充每个节点的下一个右侧节点指针
    [leetCode]1002. 查找常用字符
    [leetCode]199. 二叉树的右视图
    [leetCode]784. 字母大小写全排列
    [leetCode]1297. 子串的最大出现次数
    [leetCode]1239. 串联字符串的最大长度
    1095. 山脉数组中查找目标值
    [leetCode]1235. 规划兼职工作
  • 原文地址:https://www.cnblogs.com/maxstack/p/13373975.html
Copyright © 2011-2022 走看看