zoukankan      html  css  js  c++  java
  • Spark文档阅读之二:Programming Guides

    Quick Start: https://spark.apache.org/docs/latest/quick-start.html

    在Spark 2.0之前,Spark的编程接口为RDD (Resilient Distributed Dataset)。而在2.0之后,RDDs被Dataset替代。Dataset很像RDD,但是有更多优化。RDD仍然支持,不过强烈建议切换到Dataset,以获得更好的性能。
     

    一、最简单的Spark Shell交互分析

    scala> val textFile = spark.read.textFile("README.md")   # 构建一个Dataset
    textFile: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> textFile.count()  # Dataset的简单计算
    res0: Long = 104 
    
    scala> val linesWithSpark = textFile.filter(line => line.contain("Spark"))  # 由现有Dataset生成新Dataset
    res1: org.apache.spark.sql.Dataset[String] = [value: string]
    # 等价于:
    # res1 = new Dataset()
    # for line in textFile:
    #     if line.contain("Spark"):
    #         res1.append(line)
    # linesWithSpark = res1
    
    scala> linesWithSpark.count()
    res2: Long = 19
    
    # 可以将多个操作串行起来
    scala> textFile.filter(line => line.contain("Spark")).count()
    res3: Long = 19

    进一步的Dataset分析:

    scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)
    res12: Int = 16
    # 其实map和reduce就是两个普通的算子,不要被MapReduce中一个map配一个reduce、先map后reduce的思想所束缚
    # map算子就是对Dataset的元素X计算fun(X),并且将所有f(X)作为新的Dataset返回
    # reduce算子其实就是通过两两计算fun(X,Y)=Z,将Dataset中的所有元素归约为1个值
    
    # 也可以引入库进行计算
    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res14: Int = 16
    
    # 还可以使用其他算子
    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    
    # flatMap算子也是对Dataset的每个元素X执行fun(X)=Y,只不过map的res是
    #     res.append(Y),如[[Y11, Y12], [Y21, Y22]],结果按元素区分
    # 而flatMap是
    #     res += Y,如[Y11, Y12, Y21, Y22],各元素结果合在一起
    
    # groupByKey算子将Dataset的元素X作为参数传入进行计算f(X),并以f(X)作为key进行分组,返回值为KeyValueGroupedDataset类型
    # 形式类似于(key: k; value: X1, X2, ...),不过KeyValueGroupedDataset不是一个Dataset,value列表也不是一个array
    # 注意:这里的textFile和textFile.flatMap都是Dataset,不是RDD,groupByKey()中可以传func;如果以sc.textFile()方法读文件,得到的是RDD,groupByKey()中间不能传func
    
    # identity就是函数 x => x,即返回自身的函数
    
    # KeyValueGroupedDataset的count()方法返回(key, len(value))列表,结果是Dataset类型
    
    scala> wordCounts.collect()
    res37: Array[(String, Long)] = Array((online,1), (graphs,1), ...
    # collect操作:将分布式存储在集群上的RDD/Dataset中的所有数据都获取到driver端

    数据的cache:

    scala> linesWithSpark.cache()  # in-memory cache,让数据在分布式内存中缓存
    res38: linesWithSpark.type = [value: string]
    
    scala> linesWithSpark.count()
    res41: Long = 19

    二、最简单的独立Spark任务(spark-submit提交)

    需提前安装sbt,sbt是scala的编译工具(Scala Build Tool),类似java的maven。
    brew install sbt
     
    1)编写SimpleApp.scala
    import org.apache.spark.sql.SparkSession
    
    object SimpleApp {
        def main(args: Array[String]) {
            val logFile = "/Users/dxm/work-space/spark-2.4.5-bin-hadoop2.7/README.md"
            val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
            val logData = spark.read.textFile(logFile).cache()
            val numAs = logData.filter(line => line.contains("a")).count()  # 包含字母a的行数
            val numBs = logData.filter(line => line.contains("b")).count()  # 包含字母b的行数
            println(s"Lines with a: $numAs, Lines with b: $numBs")
            spark.stop()
        }
    }

    2)编写sbt依赖文件build.sbt

    name := "Simple Application"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

    其中,"org.apache.spark" %% "spark-sql" % "2.4.5"这类库名可以在网上查到,例如https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.0.0

    3)使用sbt打包
    目录格式如下,如果SimpleApp.scala和build.sbt放在一个目录下会编不出来
    $ find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala

    sbt目录格式要求见官方文档 https://www.scala-sbt.org/1.x/docs/Directories.html

    src/
      main/
        resources/
           <files to include in main jar here>
        scala/
           <main Scala sources>
        scala-2.12/
           <main Scala 2.12 specific sources>
        java/
           <main Java sources>
      test/
        resources
           <files to include in test jar here>
        scala/
           <test Scala sources>
        scala-2.12/
           <test Scala 2.12 specific sources>
        java/
           <test Java sources>

    使用sbt打包

    # 打包
    $ sbt package
    ...
    [success] Total time: 97 s (01:37), completed 2020-6-10 10:28:24
    # jar包位于 target/scala-2.12/simple-application_2.12-1.0.jar

    4)提交并执行Spark任务

    $ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.12/simple-application_2.12-1.0.jar
    # 报错:Caused by: java.lang.ClassNotFoundException: scala.runtime.LambdaDeserialize
    # 参考:https://stackoverflow.com/questions/47172122/classnotfoundexception-scala-runtime-lambdadeserialize-when-spark-submit
    # 这是spark版本和scala版本不匹配导致的

    查询spark所使用的scala的版本

    $ bin/spark-shell --master spark://xxx:7077
    
    scala> util.Properties.versionString
    res0: String = version 2.11.12
    修改build.sbt:
    scalaVersion := "2.11.12"
    从下载页也可验证,下载的spark 2.4.5使用的是scala 2.11
     
    重新sbt package,产出位置变更为target/scala-2.11/simple-application_2.11-1.0.jar
    再次spark-submit,成功
    $ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.11/simple-application_2.11-1.0.jar 
    Lines with a: 61, Lines with b: 30
  • 相关阅读:
    java 求两个数最大值
    java 加法运算
    javs switch 语句
    git合并分支成功,但是push失败(remote: GitLab: You are not allowed to push code to protected branches on this project.)
    python 获取日期以及时间
    1713
    linux shell脚本中的延时
    java 类的继承
    Python3 使用企业微信 API 发送消息
    java if 条件语句
  • 原文地址:https://www.cnblogs.com/desertfish/p/13137492.html
Copyright © 2011-2022 走看看