zoukankan      html  css  js  c++  java
  • 【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

    本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角

    Spark SQL、DataFrames 和 Datasets 指南

    概述

      Spark SQL 是一个结构化数据处理的 Spark 模块 。 与基础的 Spark RDD API 不同的是, Spark SQL 所提供的接口为 Spark 提供了 更多关于数据结构和正在执行的计算结构的信息。 Spark 在其内部利用这些额外的信息去做更多的优化。有几种用于和 Sparrk SQL交互的方法,包括 SQL 和 Dataset API。 当你计算一个结果, 会使用同一个执行引擎, 这独立于你所用来描述这个算法的API和语言。这种一致性意味着开发者可以轻易地在不同的 API 中来回切换, 因为它为表达给定的转换提供了最自然的方式。

      本页所有示例使用了 Spark 提供的样例数据并且可以在 spark-shell 、pyspark shell 或者 sparkR shell 中运行。

    SQL

      Spark SQL 的一个用处是执行 SQL 查询。 Spark SQL 同样可以用来从 现有的 HIVE 中读取数据。 更多有关配置这个特性的信息,请查阅 HIVE Tables 部分。当你使用其他语言执行SQL时,将会返回一个 Dataset 或者 DataFrame 作为结果。你同样可以使用命令行或者 JDBC/ODBC 与 SQL 接口进行交互。

    Dataset 和 Dataframe

      Dataset 是一种分布式数据集,是 Spark1.6 新增的接口。它提供了RDD(强类型,可以使用强大的 lambda 表达式)的优点,并受益于Spark SQL 的优化执行引擎。Dataset 可以通过 JVM 构建,然后使用转换方法(map, flatMap, filter等等)进行操作。 Dataset API 在 Java 和 Scala 中可用。 Python 并不支持Dataset API。但是由于Python的动态特性, Dataset API 的很多优势都是可用的(比如你可以自然地使用名称 row.columnName 来访问 row 的域 )。 R 语言的情况类似。

      DataFrame 是一种按列命名组织的 Dataset, 它在概念上等价于关系型数据库的一个表或者 R/Python 的一个数据帧, 但是它(DataFrame)的底层做了更多的优化。DataFrame 可以通过大量的数据源构建,例如:结构化的数据文件, HIVE 的表, 数据库,或现有的RDD。Java、Python、Scala、R语言都支持 DataFrame API。 在 Scala 和 Java, DataFrame 由Dataset的 rowS 表示。 在 Scala API 中,DataFrame 可以简单地认为是 Dataset[Row] 的别名。 然而,在 Java API 中, 用户需要使用 Dataset<Row> 来表示 DataFrame。

      在整个文档中, 我们通常把 Scala/Java Dataset 的 RowS 称为 DataFrames。

     准备开始

    起点: SparkSession

    Spark 所有功能的入口是 SparkSession 类。创建最基本的 SaprkSession, 只需要调用 SparkSession.builder():

    scala版

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits. 

    在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    java版

    import org.apache.spark.sql.SparkSession;
    
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

    在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

    Python版

    from pyspark.sql import SparkSession
    
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL basic example") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate()

    在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。

    R语言

    sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))

    在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码

    请注意,sparkR.session() 第一次被调用时,它会初始化一个全局的 SparkSession 单例对象,并且之后继续调用这个方法都将返回这个实例。 通过这种方式,用户只需要对 SparkSession 做一次初始化,然后 SparkR 的其他方法比如 read.df 将会隐式地访问这个全局地单例对象, 并且用户不需要传递 SparkSession 的实例。

    Spark2.0 的 SparkSession 提供了对 HIVE 特性的内置支持, 包括使用 HiveQL 编写查询语句的能力,访问 Hive UDFs 和 从 Hive Table 中读取数据的能力。为了使用这些特性,您需要安装一个 HIVE。

    创建 DataFrame

    有了SparkSession, 应用程序可以通过本地的 R data.frame、Hive Table、 或者 Spark 数据源 来创建DataFrame。

    作为示例,以下代码使用一个 JSON 文件的内容 创建一个 DataFrame

    Scala版

    val df = spark.read.json("examples/src/main/resources/people.json")
    
    // Displays the content of the DataFrame to stdout
    df.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    

    在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    Java版

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
    
    // Displays the content of the DataFrame to stdout
    df.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

    Python版

    # spark is an existing SparkSession
    df = spark.read.json("examples/src/main/resources/people.json")
    # Displays the content of the DataFrame to stdout
    df.show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+

    在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。

    R语言

    df <- read.json("examples/src/main/resources/people.json")
    
    # Displays the content of the DataFrame
    head(df)
    ##   age    name
    ## 1  NA Michael
    ## 2  30    Andy
    ## 3  19  Justin
    
    # Another method to print the first few rows and optionally truncate the printing of long values
    showDF(df)
    ## +----+-------+
    ## | age|   name|
    ## +----+-------+
    ## |null|Michael|
    ## |  30|   Andy|
    ## |  19| Justin|
    ## +----+-------+

    在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码

    弱类型的 Dataset 操作(aka DataFrame 操作)

    DataFrame 为 Scala、Java、Python、R语言提供了一种特定的结构化数据操作。

    上面提到过,在 Spark2.0 中,DataFrame 对于 Scala 和 Java API 仅仅是 Dataset 的 RowS。这些操作也被称为 “弱类型转换”,这与 强类型的Scala/Java 中的 “强类型转换” 形成了鲜明的对比。

    这里我们囊括了使用 Datasets 做结构化数据处理的基本示例:

    Scala版

    // This import is needed to use the $-notation
    import spark.implicits._
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show()
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+
    
    // Select everybody, but increment the age by 1
    df.select($"name", $"age" + 1).show()
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // Select people older than 21
    df.filter($"age" > 21).show()
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // Count people by age
    df.groupBy("age").count().show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+

    在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    java版

    // col("...") is preferable to df.col("...")
    import static org.apache.spark.sql.functions.col;
    
    // Print the schema in a tree format
    df.printSchema();
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show();
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+
    
    // Select everybody, but increment the age by 1
    df.select(col("name"), col("age").plus(1)).show();
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // Select people older than 21
    df.filter(col("age").gt(21)).show();
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // Count people by age
    df.groupBy("age").count().show();
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+

    在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

    Python版

     对于Python来说,我们可以通过属性(df.age)或者通过索引(df['age']) 来访问 DataFrame 的列。 虽然前者用于交互式数据探索非常方便, 但使用者强烈建议使用后者,因为它具有前瞻性,并且不会因为 DataFrame 的列命和属性名重复产生冲突。

    # spark, df are from the previous example
    # Print the schema in a tree format
    df.printSchema()
    # root
    # |-- age: long (nullable = true)
    # |-- name: string (nullable = true)
    
    # Select only the "name" column
    df.select("name").show()
    # +-------+
    # |   name|
    # +-------+
    # |Michael|
    # |   Andy|
    # | Justin|
    # +-------+
    
    # Select everybody, but increment the age by 1
    df.select(df['name'], df['age'] + 1).show()
    # +-------+---------+
    # |   name|(age + 1)|
    # +-------+---------+
    # |Michael|     null|
    # |   Andy|       31|
    # | Justin|       20|
    # +-------+---------+
    
    # Select people older than 21
    df.filter(df['age'] > 21).show()
    # +---+----+
    # |age|name|
    # +---+----+
    # | 30|Andy|
    # +---+----+
    
    # Count people by age
    df.groupBy("age").count().show()
    # +----+-----+
    # | age|count|
    # +----+-----+
    # |  19|    1|
    # |null|    1|
    # |  30|    1|
    # +----+-----+ 

    在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。

    R语言

    # Create the DataFrame
    df <- read.json("examples/src/main/resources/people.json")
    
    # Show the content of the DataFrame
    head(df)
    ##   age    name
    ## 1  NA Michael
    ## 2  30    Andy
    ## 3  19  Justin
    
    
    # Print the schema in a tree format
    printSchema(df)
    ## root
    ## |-- age: long (nullable = true)
    ## |-- name: string (nullable = true)
    
    # Select only the "name" column
    head(select(df, "name"))
    ##      name
    ## 1 Michael
    ## 2    Andy
    ## 3  Justin
    
    # Select everybody, but increment the age by 1
    head(select(df, df$name, df$age + 1))
    ##      name (age + 1.0)
    ## 1 Michael          NA
    ## 2    Andy          31
    ## 3  Justin          20
    
    # Select people older than 21
    head(where(df, df$age > 21))
    ##   age name
    ## 1  30 Andy
    
    # Count people by age
    head(count(groupBy(df, "age")))
    ##   age count
    ## 1  19     1
    ## 2  NA     1
    ## 3  30     1

    在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码

    关于 DataFrame 可执行的操作的完整列表,请移步 API Documentation

    除了简单的列引用和表示之外,DataFrame 同样有一个丰富的函数库,包括字符串操作、日期算法、常用数学操作 等等。 完整的列表可以在 DataFrame Function Reference.中找到。

    以编程方式执行 SQL 查询

    Scala版

    SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataFrame

    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")
    
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    java版

    SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataSet<Row>

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people");
    
    Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
    sqlDF.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

    Python版

    SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataFrame

    # Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")
    
    sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+

    在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。

    R语言

    SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 SparkDataFrame

    df <- sql("SELECT * FROM table")

    在 Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码

    全局临时视图

    Spark SQL 临时视图的作用域是 session 并且如果创建它(临时视图)的 session 终止,视图就会消失。如果你想使一个临时视图在 Spark 应用终止之前可以在所有 session 中共享, 那么你可以创建一个全局临时视图。全局临时视图是和系统保存的数据库 global_temp 联系在一起的, 我们必须使用限定的名称来指代它,比如: SELECT * FROM global_temp.view1

    Scala版

    // Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")
    
    // Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    
    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    java版

    // Register the DataFrame as a global temporary view
    df.createGlobalTempView("people");
    
    // Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    
    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

    Python版

    # Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")
    
    # Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+
    
    # Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    # +----+-------+
    # | age|   name|
    # +----+-------+
    # |null|Michael|
    # |  30|   Andy|
    # |  19| Justin|
    # +----+-------+

    在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。

    SQL

    CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
    
    SELECT * FROM global_temp.temp_view

    创建 Dataset

    Dataset 与 RDD 很像, 当然,它们使用专门的编码器, 而不是 java 序列化或 Kryo, 来序列化一个对象以便在网络上进行处理或者传输。

    Scala版

    case class Person(name: String, age: Long)
    
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    // +----+---+
    // |name|age|
    // +----+---+
    // |Andy| 32|
    // +----+---+
    
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。

    java版

    import java.util.Arrays;
    import java.util.Collections;
    import java.io.Serializable;
    
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.Encoder;
    import org.apache.spark.sql.Encoders;
    
    public static class Person implements Serializable {
      private String name;
      private int age;
    
      public String getName() {
        return name;
      }
    
      public void setName(String name) {
        this.name = name;
      }
    
      public int getAge() {
        return age;
      }
    
      public void setAge(int age) {
        this.age = age;
      }
    }
    
    // Create an instance of a Bean class
    Person person = new Person();
    person.setName("Andy");
    person.setAge(32);
    
    // Encoders are created for Java beans
    Encoder<Person> personEncoder = Encoders.bean(Person.class);
    Dataset<Person> javaBeanDS = spark.createDataset(
      Collections.singletonList(person),
      personEncoder
    );
    javaBeanDS.show();
    // +---+----+
    // |age|name|
    // +---+----+
    // | 32|Andy|
    // +---+----+
    
    // Encoders for most common types are provided in class Encoders
    Encoder<Integer> integerEncoder = Encoders.INT();
    Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
    Dataset<Integer> transformedDS = primitiveDS.map(
        (MapFunction<Integer, Integer>) value -> value + 1,
        integerEncoder);
    transformedDS.collect(); // Returns [2, 3, 4]
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
    String path = "examples/src/main/resources/people.json";
    Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
    peopleDS.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。

                                                                                                                        (2018-10-08)

  • 相关阅读:
    BZOJ4003:[JLOI2015]城池攻占——题解
    洛谷3388:【模板】割点(割顶)——题解
    洛谷3805:【模板】manacher算法——题解
    BZOJ3236:[AHOI2013]作业——题解
    SPOJ3267/DQUERY:D-query——题解
    自动化移动安全渗透测试框架:Mobile Security Framework
    爬取京东评论,且修改网址直接可复用哦(送代码)
    谈谈从事IT测试行业的我,对于买房买车有什么样的感受
    python调用adb命令进行手机操作
    python selenium 处理时间日期控件
  • 原文地址:https://www.cnblogs.com/yeyeck/p/9665090.html
Copyright © 2011-2022 走看看