zoukankan      html  css  js  c++  java
  • Spark权威指南(中文版)----第4章 结构化API概述

    Spark The Definitive Guide(Spark权威指南) 中文版。本书详细介绍了Spark2.x版本的各个模块,目前市面上最好的Spark2.x学习书籍!!!

    扫码关注公众号:登峰大数据,阅读中文Spark权威指南(完整版),系统学习Spark大数据框架!

    图片

    本书的这一部分将深入探讨Spark的结构化api。结构化api是一种处理各种数据的工具,从非结构化的日志文件到半结构化的CSV文件和高度结构化的Parquet文件。这些api引用了分布式收集api的三种核心类型:

    • Datasets

    • DataFrames

    • SQL tables 和 views

    虽然它们是本书的独立部分,但大多数结构化api都适用于批处理和流计算。这意味着当您使用结构化api时,从批处理迁移到流(反之亦然)的过程应该很简单,几乎不需要付出任何努力。我们将在第五部分中详细介绍流计算。

    结构化api是您用来编写大多数数据流的基本抽象。到目前为止在这本书中,我们采取了概述的方法,说明了Spark框架提供了哪些功能。这部分提供了更深入的探索。在本章中,我们将介绍你应该理解的基本概念:

    • 类型化和非类型化的api(及其差异);

    • 核心术语是什么;

    • Spark如何实际使用结构化API数据流并在集群上执行它。

    然后,我们将提供更具体的基于任务的信息,用于处理特定类型的数据或数据源。

    注意

    在继续之前,让我们回顾一下在第一部分中介绍的基本概念和定义。Spark是一个分布式编程模型,用户可以在其中指定transformation。多个transformation构建一个有向无环图(DAG)。一个action开始执行DAG的过程,作为一个单一的job作业,将它分解成多个stages阶段和task任务,以便在整个集群中执行。我们使用transformation和actioin操作的逻辑结构是DataFrames和Dataset。要创建一个新的DataFrame或Dataset,您需要调用一个转换。要开始计算或转换为本地语言类型,您需要调用一个action操作。

    4.1.  DataFrame和Dataset

    第一部分讨论DataFrames。Spark有两个结构化集合的概念: DataFrames 和 Datasets。稍后我们将讨论(细微的)差别,但是让我们先定义它们都代表什么。

    DataFrames和Dataset是(分布式的)类似于表的集合,具有定义好的行和列。每个列必须具有与所有其他列相同的行数(尽管您可以使用null来指定值的缺失),并且每个列都有类型信息,这些信息必须与集合中的每一行一致。对Spark来说,DataFrame和Dataset代表着不可变的,延迟计算的计划,这些计划指定应用于驻留在某个位置的数据以生成一些输出的操作。当我们在DataFrame上执行action操作时,我们指示Spark执行实际的transformation操作,并返回结果。DataFrame和Dataset表示如何操作行和列来计算用户期望结果的计划。

    注意

    表和视图基本上与DataFrames相同。我们只是针对它们执行SQL,而不是DataFrame代码。我们在第10章中讨论了所有这些问题,重点是Spark SQL。

    为了给这些定义添加一些更具体的特性,我们需要讨论schema,定义分布式集合中存储的数据类型的方式。

    4.2.  Schemas

    schemas定义了DataFrame的列名和类型。您可以手动定义schemas模式或从数据源读取schemas模式(通常称为读模式)。Schemas包含列类型,用于声明什么位置存储了什么类型的数据。

    4.3.  Spark结构化API中的类型概述

    Spark实际上是它自己的编程语言。在内部,Spark使用一种名为Catalyst的引擎,在计划和处理计算任务的过程中,维护自己的类型信息。在这样做的过程中,这打开了各种各样的执行优化,从而产生了显著的差异。Spark类型直接映射到Spark维护的不同语言api,在Scala、Java、Python、SQL和R中都存在一个查找表。即使我们从Python或R中使用Spark的结构化api,我们的大多数操作都将严格地使用Spark类型,而不是Python类型。例如,以下代码不会在Scala或Python中执行加法;它实际上只是在Spark中执行加法:

    // in Scalaval df = spark.range(500).toDF("number")df.select(df.col("number") + 10)
    # in Pythondf = spark.range(500).toDF("number")df.select(df["number"] + 10)
    
    

    这个加法操作之所以发生,是因为Spark会将用输入语言编写的表达式转换为Spark的内部Catalyst表示相同类型的信息。然后它会对转换后的内部表示进行操作。

    我们马上就会讲到为什么会出现这种情况,但在之前,我们需要讨论下Datasets。

    4.3.1.   对比DataFrames与Datasets

    本质上,在结构化的api中,还有两个api,即“untyped”DataFrames和“typed”Datasets。说DataFrames是untyped的,这是不准确的;它们有类型,但是Spark完全维护它们,并且只检查这些类型是否与运行时模式中指定的类型一致。然而,另一方面,Datasets检查类型是否符合编译时的规范。Dataset只适用于Java虚拟机(JVM)的语言(Scala和Java),并且我们指定带有case类或Java bean的类型。

    在大多数情况下,您可能会使用DataFrames。对于Spark(在Scala中),DataFrames只是Row类型的Datasets。“Row”类型是Spark对其优化的数据格式的内部表示。这种格式可以进行高效的计算。因为不是使用JVM类型(这可能会有GC垃圾收集和对象实例化成本),Spark操作自己的内部数据格式操作,不会产生这些成本。对于Spark(在Python或R中),没有Dataset这样的东西: 所有的东西都是DataFrame,因此我们总是能够使用优化的数据格式。

     注意

    内部的catalyst格式在众多的Spark演示中都有相关的讲解。鉴于这本书是为更广大的读者准备的,我们将不去深究底层实现。如果你好奇的话,有一些很精彩的演讲可以参考,由Databricks公司员工Josh Rosen 和 Herman van Hovell提供,都是关于他们在Spark的catalyst引擎开发中的工作内容。(https://youtu.be/5ajs8EIPWGI)

    理解DataFrames、Spark类型和schema需要一些时间来消化。您需要知道的是,当您使用DataFrames时,您正在利用Spark优化的内部格式。这种格式对Spark的所有语言api都使用相同的效率增益。如果您需要严格的编译时检查,请阅读第11章以了解更多信息。

    让我们转到一些更友好和更容易理解的概念: columns 和 rows。

    4.3.2.   Columns

    columns表示一个简单的类型,如integer或string,复杂类型,如array或map,或null。Spark将为您跟踪所有这些类型的信息,并提供多种方式对columns进行转换。在第5章中广泛讨论了columns,但是在大多数情况下,您可以将Spark Column类型看作是表中的列。

    4.3.3.   Rows

    一行(Row)只是表示数据的一条记录。DataFrame中的每条数据记录必须是Row类型。我们可以从SQL、弹性分布式数据集(RDDs)、数据源或手动创建这些Rows。在这里,我们用一个数值范围来创建一个:

    // in Scalaspark.range(2).toDF().collect()
    # in Pythonspark.range(2).collect()

    这两个结果都是Row对象数组。

    4.3.4.   Spark Types

    我们之前提到过Spark有大量的内部类型表示。我们在接下来的几页中包含了一个方便的参考表,这样您就可以很容易地引用在您的特定语言中,与Spark类型相匹配的类型。

    在列出参考表之前,让我们讨论一下如何实例化或声明一个列是某种类型的。

    使用相应的Scala类型来声明,使用以下代码:

    import org.apache.spark.sql.types._val b = ByteType

    使用相应的Java类型,您应该使用以下包中的工厂方法:

    import org.apache.spark.sql.types.DataTypes;ByteType x = DataTypes.ByteType;

    Python类型有时有特定的需求,您可以看到表4-1中列出的。对应于Scala和Java,您可以在表4-2和表4-3中看到它们。要使用正确的Python类型,请使用以下内容:

    from pyspark.sql.types import *b = ByteType()

    下表为每一种Spark的语言绑定提供了详细的类型信息。

    图片

    图片

    图片

    图片

    图片

    值得注意的是,随着Spark SQL的持续增长,类型可能会随着时间的推移而改变,因此您可能想要参考Spark的文档以备将来更新。当然,所有这些类型都很好,但是您几乎从不使用纯静态的DataFrames。你会一直操作和改变DataFrame。因此,我们要向您介绍结构化api中的执行过程。

    4.4.  结构化API执行过程概述

    本节将演示代码是如何跨集群执行的。这将帮助您了解(和调试)编写代码和集群上执行代码的过程,因此,让我们执行一个结构化的API查询,了解从用户代码到执行代码的转换过程。以下是这些步骤的概述:

    1. 写DataFrame /Dataset/ SQL代码。

    1. 如果是有效的代码,即代码没有编译错误,Spark将其转换为一个逻辑计划。

    1. Spark将此逻辑计划转换为物理计划,同时进行代码优化。

    1. Spark然后在集群上执行这个物理计划(基于RDD操作)。

    要执行代码,我们必须编写代码。然后,将代码提交给Spark集群运行。然后,该代码通过Catalyst优化器,它决定如何执行代码,并给出一个计划,最后,代码运行,结果返回给用户。

    图片

    4.4.1.   逻辑计划(Logical Plan)

    执行的第一个阶段是将用户代码转换成一个逻辑计划。图4-2说明了这个过程。

    图片

    这个逻辑计划只代表一组抽象的转换,不涉及Executors执行器或Driver驱动程序,它纯粹是将用户的表达式转换成最优的版本。它通过将用户代码转换成unresolved logical plan.(未解决的逻辑计划)来实现这一点。这个计划是unresolved(没有解决),因为尽管您的代码可能是有效的,但是它引用的表或列可能存在,也可能不存在。Spark使用catalog (所有表和DataFrame信息的存储库)来resolve(解析)analyzer(分析器中的列和表。如果catalog中不存在所需的表或列名,analyzer(分析器可能会拒绝unresolved logical plan.(未解决的逻辑计划)。如果analyzer可以resolve这个unresolved logical plan.(未解决的逻辑计划),解析的结果会传给Catalyst 优化器,此优化器是一组规则的集合,用于优化逻辑计划,通过谓词下推、投影等方式进行优化。可以扩展Catalyst,使其包含特定于领域的优化的规则。

    4.4.2.   物理计划Physical Plan

    在成功地创建了一个优化的逻辑计划之后,Spark就开始了物理计划过程。通常称为Spark计划的物理计划指定了逻辑计划如何通过生成不同的物理执行策略,并通过成本模型来比较它们,从而选择一个最优的物理计划在集群上面执行的。如图4-3所示。成本比较的一个例子是:通过查看给定表的物理属性(表的大小或其分区有多大)来选择如何执行给定的连接。

    图片

    物理计划的结果是一系列的RDDs和转换。这个结果说明了为什么有时将Spark成为编译器——它接受DataFrames、dataset和SQL的查询,并将它们编译成RDD转换。

    4.4.3.   执行

    在选择一个物理计划时,Spark运行所有的RDDs代码,即Spark的底层编程接口(我们将在第三部分中介绍)。Spark在运行时执行进一步的优化,生成本地Java字节码,可以在执行过程中移除整个任务或阶段。最后将结果返回给用户。

    4.5.  结束语

    在本章中,我们讨论了Spark结构化api,以及Spark如何将代码转换为在集群上实际执行的内容。在接下来的章节中,我们讨论了核心概念以及如何使用结构化api的关键功能。

  • 相关阅读:
    NYOJ 10 skiing DFS+DP
    51nod 1270 数组的最大代价
    HDU 4635 Strongly connected
    HDU 4612 Warm up
    POJ 3177 Redundant Paths
    HDU 1629 迷宫城堡
    uva 796
    uva 315
    POJ 3180 The Cow Prom
    POJ 1236 Network of Schools
  • 原文地址:https://www.cnblogs.com/lanblogs/p/15162212.html
Copyright © 2011-2022 走看看