Spark The Definitive Guide(Spark权威指南) 中文版。本书详细介绍了Spark2.x版本的各个模块,目前市面上最好的Spark2.x学习书籍!!!
扫码关注公众号:登峰大数据,阅读中文Spark权威指南(完整版),系统学习Spark大数据框架!
本书的这一部分将深入探讨Spark的结构化api。结构化api是一种处理各种数据的工具,从非结构化的日志文件到半结构化的CSV文件和高度结构化的Parquet文件。这些api引用了分布式收集api的三种核心类型:
-
Datasets
-
DataFrames
-
SQL tables 和 views
虽然它们是本书的独立部分,但大多数结构化api都适用于批处理和流计算。这意味着当您使用结构化api时,从批处理迁移到流(反之亦然)的过程应该很简单,几乎不需要付出任何努力。我们将在第五部分中详细介绍流计算。
结构化api是您用来编写大多数数据流的基本抽象。到目前为止在这本书中,我们采取了概述的方法,说明了Spark框架提供了哪些功能。这部分提供了更深入的探索。在本章中,我们将介绍你应该理解的基本概念:
-
类型化和非类型化的api(及其差异);
-
核心术语是什么;
-
Spark如何实际使用结构化API数据流并在集群上执行它。
然后,我们将提供更具体的基于任务的信息,用于处理特定类型的数据或数据源。
注意
在继续之前,让我们回顾一下在第一部分中介绍的基本概念和定义。Spark是一个分布式编程模型,用户可以在其中指定transformation。多个transformation构建一个有向无环图(DAG)。一个action开始执行DAG的过程,作为一个单一的job作业,将它分解成多个stages阶段和task任务,以便在整个集群中执行。我们使用transformation和actioin操作的逻辑结构是DataFrames和Dataset。要创建一个新的DataFrame或Dataset,您需要调用一个转换。要开始计算或转换为本地语言类型,您需要调用一个action操作。
4.1. DataFrame和Dataset
第一部分讨论DataFrames。Spark有两个结构化集合的概念: DataFrames 和 Datasets。稍后我们将讨论(细微的)差别,但是让我们先定义它们都代表什么。
DataFrames和Dataset是(分布式的)类似于表的集合,具有定义好的行和列。每个列必须具有与所有其他列相同的行数(尽管您可以使用null来指定值的缺失),并且每个列都有类型信息,这些信息必须与集合中的每一行一致。对Spark来说,DataFrame和Dataset代表着不可变的,延迟计算的计划,这些计划指定应用于驻留在某个位置的数据以生成一些输出的操作。当我们在DataFrame上执行action操作时,我们指示Spark执行实际的transformation操作,并返回结果。DataFrame和Dataset表示如何操作行和列来计算用户期望结果的计划。
注意
表和视图基本上与DataFrames相同。我们只是针对它们执行SQL,而不是DataFrame代码。我们在第10章中讨论了所有这些问题,重点是Spark SQL。
为了给这些定义添加一些更具体的特性,我们需要讨论schema,定义分布式集合中存储的数据类型的方式。
4.2. Schemas
schemas定义了DataFrame的列名和类型。您可以手动定义schemas模式或从数据源读取schemas模式(通常称为读模式)。Schemas包含列类型,用于声明什么位置存储了什么类型的数据。
4.3. Spark结构化API中的类型概述
Spark实际上是它自己的编程语言。在内部,Spark使用一种名为Catalyst的引擎,在计划和处理计算任务的过程中,维护自己的类型信息。在这样做的过程中,这打开了各种各样的执行优化,从而产生了显著的差异。Spark类型直接映射到Spark维护的不同语言api,在Scala、Java、Python、SQL和R中都存在一个查找表。即使我们从Python或R中使用Spark的结构化api,我们的大多数操作都将严格地使用Spark类型,而不是Python类型。例如,以下代码不会在Scala或Python中执行加法;它实际上只是在Spark中执行加法:
// in Scala
val df = spark.range(500).toDF("number")
df.select(df.col("number") + 10)
# in Python
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)
这个加法操作之所以发生,是因为Spark会将用输入语言编写的表达式转换为Spark的内部Catalyst表示相同类型的信息。然后它会对转换后的内部表示进行操作。
我们马上就会讲到为什么会出现这种情况,但在之前,我们需要讨论下Datasets。
4.3.1. 对比DataFrames与Datasets
本质上,在结构化的api中,还有两个api,即“untyped”DataFrames和“typed”Datasets。说DataFrames是untyped的,这是不准确的;它们有类型,但是Spark完全维护它们,并且只检查这些类型是否与运行时模式中指定的类型一致。然而,另一方面,Datasets检查类型是否符合编译时的规范。Dataset只适用于Java虚拟机(JVM)的语言(Scala和Java),并且我们指定带有case类或Java bean的类型。
在大多数情况下,您可能会使用DataFrames。对于Spark(在Scala中),DataFrames只是Row类型的Datasets。“Row”类型是Spark对其优化的数据格式的内部表示。这种格式可以进行高效的计算。因为不是使用JVM类型(这可能会有GC垃圾收集和对象实例化成本),Spark操作自己的内部数据格式操作,不会产生这些成本。对于Spark(在Python或R中),没有Dataset这样的东西: 所有的东西都是DataFrame,因此我们总是能够使用优化的数据格式。
注意
内部的catalyst格式在众多的Spark演示中都有相关的讲解。鉴于这本书是为更广大的读者准备的,我们将不去深究底层实现。如果你好奇的话,有一些很精彩的演讲可以参考,由Databricks公司员工Josh Rosen 和 Herman van Hovell提供,都是关于他们在Spark的catalyst引擎开发中的工作内容。(https://youtu.be/5ajs8EIPWGI)
理解DataFrames、Spark类型和schema需要一些时间来消化。您需要知道的是,当您使用DataFrames时,您正在利用Spark优化的内部格式。这种格式对Spark的所有语言api都使用相同的效率增益。如果您需要严格的编译时检查,请阅读第11章以了解更多信息。
让我们转到一些更友好和更容易理解的概念: columns 和 rows。
4.3.2. Columns
columns表示一个简单的类型,如integer或string,复杂类型,如array或map,或null。Spark将为您跟踪所有这些类型的信息,并提供多种方式对columns进行转换。在第5章中广泛讨论了columns,但是在大多数情况下,您可以将Spark Column类型看作是表中的列。
4.3.3. Rows
一行(Row)只是表示数据的一条记录。DataFrame中的每条数据记录必须是Row类型。我们可以从SQL、弹性分布式数据集(RDDs)、数据源或手动创建这些Rows。在这里,我们用一个数值范围来创建一个:
// in Scala
spark.range(2).toDF().collect()
# in Python
spark.range(2).collect()
这两个结果都是Row对象数组。
4.3.4. Spark Types
我们之前提到过Spark有大量的内部类型表示。我们在接下来的几页中包含了一个方便的参考表,这样您就可以很容易地引用在您的特定语言中,与Spark类型相匹配的类型。
在列出参考表之前,让我们讨论一下如何实例化或声明一个列是某种类型的。
使用相应的Scala类型来声明,使用以下代码:
import org.apache.spark.sql.types._
val b = ByteType
使用相应的Java类型,您应该使用以下包中的工厂方法:
import org.apache.spark.sql.types.DataTypes;
ByteType x = DataTypes.ByteType;
Python类型有时有特定的需求,您可以看到表4-1中列出的。对应于Scala和Java,您可以在表4-2和表4-3中看到它们。要使用正确的Python类型,请使用以下内容:
from pyspark.sql.types import *
b = ByteType()
下表为每一种Spark的语言绑定提供了详细的类型信息。
值得注意的是,随着Spark SQL的持续增长,类型可能会随着时间的推移而改变,因此您可能想要参考Spark的文档以备将来更新。当然,所有这些类型都很好,但是您几乎从不使用纯静态的DataFrames。你会一直操作和改变DataFrame。因此,我们要向您介绍结构化api中的执行过程。
4.4. 结构化API执行过程概述
本节将演示代码是如何跨集群执行的。这将帮助您了解(和调试)编写代码和集群上执行代码的过程,因此,让我们执行一个结构化的API查询,了解从用户代码到执行代码的转换过程。以下是这些步骤的概述:
-
写DataFrame /Dataset/ SQL代码。
-
如果是有效的代码,即代码没有编译错误,Spark将其转换为一个逻辑计划。
-
Spark将此逻辑计划转换为物理计划,同时进行代码优化。
-
Spark然后在集群上执行这个物理计划(基于RDD操作)。
要执行代码,我们必须编写代码。然后,将代码提交给Spark集群运行。然后,该代码通过Catalyst优化器,它决定如何执行代码,并给出一个计划,最后,代码运行,结果返回给用户。
4.4.1. 逻辑计划(Logical Plan)
执行的第一个阶段是将用户代码转换成一个逻辑计划。图4-2说明了这个过程。
这个逻辑计划只代表一组抽象的转换,不涉及Executors执行器或Driver驱动程序,它纯粹是将用户的表达式转换成最优的版本。它通过将用户代码转换成unresolved logical plan.(未解决的逻辑计划)来实现这一点。这个计划是unresolved(没有解决),因为尽管您的代码可能是有效的,但是它引用的表或列可能存在,也可能不存在。Spark使用catalog (所有表和DataFrame信息的存储库)来resolve(解析)analyzer(分析器)中的列和表。如果catalog中不存在所需的表或列名,analyzer(分析器)可能会拒绝unresolved logical plan.(未解决的逻辑计划)。如果analyzer可以resolve这个unresolved logical plan.(未解决的逻辑计划),解析的结果会传给Catalyst 优化器,此优化器是一组规则的集合,用于优化逻辑计划,通过谓词下推、投影等方式进行优化。可以扩展Catalyst,使其包含特定于领域的优化的规则。
4.4.2. 物理计划Physical Plan
在成功地创建了一个优化的逻辑计划之后,Spark就开始了物理计划过程。通常称为Spark计划的物理计划指定了逻辑计划如何通过生成不同的物理执行策略,并通过成本模型来比较它们,从而选择一个最优的物理计划在集群上面执行的。如图4-3所示。成本比较的一个例子是:通过查看给定表的物理属性(表的大小或其分区有多大)来选择如何执行给定的连接。
物理计划的结果是一系列的RDDs和转换。这个结果说明了为什么有时将Spark成为编译器——它接受DataFrames、dataset和SQL的查询,并将它们编译成RDD转换。
4.4.3. 执行
在选择一个物理计划时,Spark运行所有的RDDs代码,即Spark的底层编程接口(我们将在第三部分中介绍)。Spark在运行时执行进一步的优化,生成本地Java字节码,可以在执行过程中移除整个任务或阶段。最后将结果返回给用户。
4.5. 结束语
在本章中,我们讨论了Spark结构化api,以及Spark如何将代码转换为在集群上实际执行的内容。在接下来的章节中,我们讨论了核心概念以及如何使用结构化api的关键功能。