zoukankan      html  css  js  c++  java
  • RDD、DataFrame、Dataset

    RDD是Spark建立之初的核心API。RDD是不可变分布式弹性数据集,在Spark集群中可跨节点分区,并提供分布式low-level API来操作RDD,包括transformation和action。

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

    使用RDD的一般场景:

    • 你需要使用low-level的transformation和action来控制你的数据集;
    • 你的数据集非结构化,比如:流媒体或者文本流;
    • 你想使用函数式编程来操作你的数据,而不是用特定领域语言(DSL)表达;
    • 你不在乎schema,比如,当通过名字或者列处理(或访问)数据属性不在意列式存储格式;
    • 你放弃使用DataFrame和Dataset来优化结构化和半结构化数据集。
    DataFrame与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame的数据集都是按指定列存储,即结构化数据。类似于传统数据库中的表。DataFrame的设计是为了让大数据处理起来更容易。DataFrame允许开发者把结构化数据集导入DataFrame,并做了higher-level的抽象;DataFrame提供特定领域的语言(DSL)API来操作你的数据集。
    在Spark2.0中,DataFrame API将会和Dataset  API合并,统一数据处理API。

    什么时候使用DataFrame或者Dataset?

    • 你想使用丰富的语义,high-level抽象,和特定领域语言API,那你可以使用DataFrame或者Dataset;
    • 你处理的半结构化数据集需要high-level表达,filter,map,aggregation,average,sum,SQL查询,列式访问和使用lambda函数,那你可以使用DataFrame或者Dataset;
    • 你想利用编译时高度的type-safety,Catalyst优化和Tungsten的code生成,那你可以使用DataFrame或者Dataset;
    • 你想统一和简化API使用跨Spark的Library,那你可以使用DataFrame或者Dataset;
    • 如果你是一个R使用者,那你可以使用DataFrame或者Dataset;
    • 如果你是一个Python使用者,那你可以使用DataFrame或者Dataset。

    DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。 

    Spark SQL 是spark中用于处理结构化数据的模块。Spark SQL相对于RDD的API来说,提供更多结构化数据信息和计算方法。Spark SQL 提供更多额外的信息进行优化。可以通过SQL或DataSet API方式同Spark SQL进行交互。

    DataSet是分布式的数据集合。它集中了RDD的优点(强类型 和可以用强大lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作.

    RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

    Datasets and DataFrames A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

    A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

    Throughout this document, we will often refer to Scala/Java Datasets of Rows as DataFrames.

    dataset和DataFrames
    dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,它提供了RDDs(强大的类型、使用强大的lambda函数的能力)和Spark SQL优化的执行引擎的优点。可以从JVM对象构造数据集,然后使用函数转换(map、flatMap、filter等)进行操作。Dataset API可以在Scala和Java中使用。Python不支持Dataset API。但是由于Python的动态性,Dataset API的许多好处已经存在(例如,您可以按row. columnname自然地访问行的字段)。R的情况类似。
    DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表或R/Python中的数据框架,但在底层有更丰富的优化。dataframes可以从大量的数据源中构建,比如:结构化数据文件、Hive中的表、外部数据库或现有的RDDs。DataFrame API在Scala、Java、Python和r中都可以使用,在Scala和Java中,dataframe由行数据集表示。在Scala API中,DataFrame只是数据集[Row]的类型别名。而在Java API中,用户需要使用Dataset来表示一个DataFrame。
    在本文档中,我们经常将Scala/Java数据集作为dataframes引用。

    DataSet是在RDD基础上进行优化过的分布式数据集,里面的数据是强类型的,没一个字段都有一个类型和名字

    通过dataset可以创建dataframe,在spark 2.0,dataframe其实是dataset里面装的row即dataset[row]

     

    使用dataframe或者sql处理数据,现将非结构化数据转为为结构化数据,然后注册视图,执行sql(transformation),最后触发Action提交任务

    --------------------

    scala> val lines=spsession.read.textFile("/tmp/person.txt").toDF()
    lines: org.apache.spark.sql.DataFrame = [value: string]

    scala> val personDS=lines.map(x=>{val arr= x.getAs[String]("value").split(",");(arr(0),arr(1),arr(2),arr(3))})
    personDS: org.apache.spark.sql.Dataset[(String, String, String, String)] = [_1: string, _2: string ... 2 more fields]

    scala> personDS.show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

  • 相关阅读:
    mysql存储过程
    Mysql中的触发器
    快速开始、环境搭建、修改包名、新建模块、正式部署
    windows下redis下载安装
    Windows10下mysql 8.0.19 安装配置方法图文教程
    IDEA中安装SVN
    常见页面报错
    Python AttributeError: 'Module' object has no attribute 'STARTF_USESHOWINDOW'
    如何编写一篇高质量的技术博文?学习本文的排名靠前大法
    Linux use apktool problem包体变大GLIBC2.14等问题
  • 原文地址:https://www.cnblogs.com/playforever/p/9203583.html
Copyright © 2011-2022 走看看