zoukankan      html  css  js  c++  java
  • 【Spark】Spark Quick Start(快速入门翻译)

     本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷

    目录导航在右上角,感谢两个大佬(孤傲苍狼  JavaScript自动生成博文目录导航 和 juejiang 为博客园添加目录的配置总结)提供的帮助。这篇文章还有个问题 scala/python/java 使用 Spark 的介绍不能像官网那样可以通过点击导航来显示不同的内容,很影响阅读。我在想办法改进

    Quick Start

    这个指南提供了使用Spark的快速介绍。我们会首先介绍Spark 交互式编程(使用Python或者Scala)的 API, 然后展示如何用Java、Scala 和 Python来编写应用程序。

    为了使用这个指南,您需要先从 Spark 网页 下载打包发布的Spark安装包。由于我们将不会(在指南中)使用HDFS, 您可以下载任意版本的Hadoop安装包。

    需要注意的是,Spark2.0 之前, Spark的主要编程接口是弹性分布式数据集(Resilient Distributed Dataset (RDD))。Spark2.0 之后, RDD 被 Dataset 取代,Dataset 和 RDD 一样是强类型,但是在底层进行了更多的优化。Spark2.0 之后仍然支持 RDD 接口,并且您可以从RDD编程指南中 获取更详细的参考。当然,我们强烈建议您选择使用Dataset, 因为它的性能比RDD更好。 查看 SQL编程指南 以得到更多关于Dataset的信息。

    使用 Spark Shell 交互式编程

    基本操作

    Spark Shell 提供了一个简单的方式去学习 API,同时也提供了一个强大的交互式数据分析工具。它可以基于 Scala(一种在java 虚拟机上运行并因此可以很好地使用已有的java库的编程语言)或 Python 使用。在 Spark 目录下运行以下内容来开始(Sprk Shell):

      Scala 版

    ./bin/pyspark

    Python 版

    ./bin/pyspark

    如果你当前环境使用pip下载了 PySpark,可以使用如下下方式调用

    pyspark

    Spark 主要的抽象是一个被叫做 Dataset 的分布式集合。 Dataset 可以通过 Hadoop InputFormat(比如HDFS文件)或者 转换其他 Dataset 中创建。让我们通过 Spark 源目录下的 README 文件内容创建一个新的 Dataset:

    Scala 版

    scala> val textFile = spark.read.textFile("README.md")
    textFile: org.apache.spark.sql.Dataset[String] = [value: string]

    Python 版

    >>> textFile = spark.read.text("README.md")

    你可以直接从Dataset中, 通过调用一些操作或者转化Dataset以获得一个新的Dataset来获取它的值。请阅读 API 文档(Scala / Python)  以获取更多细节

    Scala 版

    scala> textFile.count() // 该Dataset中的成员数量
    res0: Long = 126 //  由于README.md 会随着时间的推移不断改变,所以结果可能会有所不同, 其他输出也有类似情况
    
    scala> textFile.first() // 该Dataset的第一个成员
    res1: String = # Apache Spark

    Python 版

    >>> textFile.count()  # 该DataFrame中的行数
    126
    
    >>> textFile.first()  # 该DataFrame的第一行
    Row(value=u'# Apache Spark')

    现在让我们使用该Dataset来转换成一个新的Dataset。 我们调用 filter 来返回一个新的Dataset, 其中包含这个文件内容的子集。

    Scala 版

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

    Python 版

    >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

    我们可以将数据集转换和数据集操作串接在一起

    Scala 版

    scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
    res3: Long = 15

    Python 版

    >>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
    15

    更多关于Dataset的操作

    Dataset操作和转换可以用来做更复杂的计算。假设我们想要找到单词数量最多的那行:

    Scala 版

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res4: Long = 15

    这首先将文件中的一行映射成一个整数值,并创建一个新的Dataset。调用该 Dataset 的 reduce 方法以找到最大的单词计数。map 和 reduce 的参数是 Scala 的函数字面量(闭包),并且可以使用任何语言的特性或者 Scala/Java 库。 比如, 我们可以很荣誉地调用任何地方声明地函数(方法)。我们将使用 Math.max() 方法以使这段代码易于理解:

    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5: Int = 15

    MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

    这里,我们调用 flatMap 来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset,然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为(String, Long)数据对形式 的Dateset。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:

    scala> wordCounts.collect()
    res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

    Python 版

    >>> from pyspark.sql.functions import *
    >>> textFile.select(size(split(textFile.value, "s+")).name("numWords")).agg(max(col("numWords"))).collect()
    [Row(max(numWords)=15)]

    这首先将文件中的一行映射成一个整数值 并取一个为 “numWords” 的别名,同时创建一个新的DataFrame。调用该 Dataset 的 agg 方法以找到最大的单词计数。select 和 agg 的参数都是 Colum,我们可以使用 df.colName 方法来从一个DataFrame中获得一个 colum。我们同样可以导入 pyspark.sql.functions, 它提供了很多简易的方法从一个已有的 Colum 构建一个新的 Colum。

    MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:

    >>> wordCounts = textFile.select(explode(split(textFile.value, "s+")).alias("word")).groupBy("word").count()

    这里,我们在 select 方法中使用了 explode 方法来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset。然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为一个拥有两个Colum:“word” 和 “count” 的DataFrame。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:

    >>> wordCounts.collect()
    [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

     

    缓存(Caching)

    Spark同样支持将数据集加入到一个集群中的内存缓存中。当数据被重复访问时,这是非常有用的。比如查询一个小的热点数据集 或者 运行像PageRank 这样的迭代算法。让我们标记我们的 linesWithSpark  作为缓存数据 来作为一个例子:

    Scala 版

    scala> linesWithSpark.cache()
    res7: linesWithSpark.type = [value: string]
    
    scala> linesWithSpark.count()
    res8: Long = 15
    
    scala> linesWithSpark.count()
    res9: Long = 15

    Python 版

    >>> linesWithSpark.cache()
    
    >>> linesWithSpark.count()
    15
    
    >>> linesWithSpark.count()
    15

    使用Spark来探索和缓存一个100行的文本文件看起来很蠢。有趣的是,这些方法同样可以作用在非常大的数据集中,哪怕它们被分布在数十个或上百个节点中。正如 RDD编程指南 中描述的那样, 您可以通过连接 bin/spark-shell 到一个集群中来进行以上交互式操作。

    独立的应用程序

    假设我们希望使用 Spark API 编写一个独立的 应用程序。  我们将分别使用Scala(带sbt),Java(带Maven) 和 Python(pip) 编写一个简单的应用程序。

    Scala

    我们将在 Scala 中创建一个Spark 应用程序——非常简单。 实际上,它被命名为 SimleApp.scala

    /* SimpleApp.scala */
    import org.apache.spark.sql.SparkSession
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
        val logData = spark.read.textFile(logFile).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println(s"Lines with a: $numAs, Lines with b: $numBs")
        spark.stop()
      }
    }

    注意,这个应用程序需要定义一个 main() 方法 而不是 继承 scala.App. scala.App 的子类可能无法正常地工作。

    这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。

    我们调用 SparkSession.builder 来构造一个 【SparkSession】,然后设置应用的名字, 最后调用 getOrCreate 方法获取一个 【SparkSession】实例。

    我们的应用程序取决于Spark API, 所以我们同样需要一个 sbt 配置文件, build.sbt, 这表示 Spark 是一个依赖组件。

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.8"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
    

    为了使 sbt 能够正常工作, 我们需要根据经典的目录结构布局 SimpleApp.scala 和 build.sbt。一旦完成这些,我们就可以创建一个包含这个应用程序源代码的JAR包, 然后使用 spark-submit 脚本运行我们的程序。

    # Your directory layout should look like this
    $ find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala
    
    # Package a jar containing your application
    $ sbt package
    ...
    [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
    
    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/scala-2.11/simple-project_2.11-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23

    Java


    这个例子将会使用 Maven 编译一个JAR 应用程序,但是很多类似的构建系统都可以完成这些工作。

    我们将创建一个简单的Spark应用程序, SimpleApp.java

    /* SimpleApp.java */
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.Dataset;
    
    public class SimpleApp {
      public static void main(String[] args) {
        String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
        SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
        Dataset<String> logData = spark.read().textFile(logFile).cache();
    
        long numAs = logData.filter(s -> s.contains("a")).count();
        long numBs = logData.filter(s -> s.contains("b")).count();
    
        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
        spark.stop();
      }
    }

    这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。

    为了构建这个程序, 我们同样要编写一个 Maven pom.xml 文件,这个文件将 Spark 列为一个依赖组件。请注意,Spark 构件 被标记为Scala版本

    <project>
      <groupId>edu.berkeley</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.1</version>
        </dependency>
      </dependencies>
    </project>

    我们根据规范的Maven目录结构列出这些文件

    $ find .
    ./pom.xml
    ./src
    ./src/main
    ./src/main/java
    ./src/main/java/SimpleApp.java

    现在,我们可以使用 Maven 打包这个应用程序并且 通过 ./bin/spark-submit. 执行

    # Package a JAR containing your application
    $ mvn package
    ...
    [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
    
    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/simple-project-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23

    Python

    这里我们将展示如何使用Python API(PySpark)来编写一个应用程序

    如果你正构建一个打包的 PySpark应用程序或库,你可以将它添加到你的 setup.py 文件中, 如下:

    install_requires=[
            'pyspark=={site.SPARK_VERSION}'
        ]

    作为示例,我们将创建一个简单的 Spark 应用程序, SimpleApp.py:

    """SimpleApp.py"""
    from pyspark.sql import SparkSession
    
    logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
    spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
    logData = spark.read.text(logFile).cache()
    
    numAs = logData.filter(logData.value.contains('a')).count()
    numBs = logData.filter(logData.value.contains('b')).count()
    
    print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
    
    spark.stop()

    这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。和 Scala 和 Java 例子一样, 我们使用 SparkSession 来创建 Dataset 。 对于使用自定义类或者第三方库的应用程序, 我们同样可以通过它的 --py-- files 参数将代码和依赖打包成zip文件(使用 spark-submit --help 查看细节)的形式 添加到 spark-submit。 SimpleApp 足够简单, 所以我们不用指定任何代码依赖组件。

    我们使用 bin/spark-submit 脚本运行这个程序

    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --master local[4] 
      SimpleApp.py
    ...
    Lines with a: 46, Lines with b: 23

    如果您将PySpark通过 pip 安装到了您的环境中(eg. pip install pyspark),根据您的喜好,可以使用常规的Python解释器 或者 使用 spark-submit 来运行您的程序

    # Use the Python interpreter to run your application
    $ python SimpleApp.py
    ...
    Lines with a: 46, Lines with b: 23

    下一步

    祝贺您运行了您的第一个 Spark 应用程序

      关于API的深入概述,请从 RDD 编程指南SQL 编程指南 开始, 或者 查看编程指南菜单 以了解其他组件

      关于使用集群运行应用程序,请移步 部署概述

      最后, Spark 包含了几个简单的例子, 它们被保存在 example 目录下(Scala, Java, Python, R),你可以按照以下方式运行它们:

    # For Scala and Java, use run-example:
    ./bin/run-example SparkPi
    
    # For Python examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/python/pi.py
    
    # For R examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/r/dataframe.R
  • 相关阅读:
    log4net插入access自定义字段
    前端规范
    烤冷面项目进度文档
    响应式布局及bootstrap(实例)
    HTML嵌套规则
    前端规范2-CSS规范
    前端规范1-HTML规范
    入驻博客园
    .net中运用solr提升搜索效率(入门)
    .net 使用validator做数据校验
  • 原文地址:https://www.cnblogs.com/yeyeck/p/9652117.html
Copyright © 2011-2022 走看看