zoukankan      html  css  js  c++  java
  • Mastering-Spark-SQL学习笔记02 SparkSession

    SparkSession是在使用类型化数据集(或基于非类型化Row-基于DataFrame)数据抽象开发Spark SQL应用程序时创建的首批对象之一。

    在Spark 2.0中,SparkSession将SQLContext和HiveContext合并到一个对象中。

    使用SparkSession.builder方法来创建一个SparkSession实例,使用stop方法停止SparkSession实例。

    import org.apache.spark.sql.SparkSession
    val spark: SparkSession = SparkSession.builder
      .appName("My Spark Application") // optional and will be auto generated if not specified
      .master("local[*]")   // only for demo and testing purposes, use spark-submit instead
      .enableHiveSupport()  // self-explanatory, isn't it?
      .config("spark.sql.warehouse.dir", "target/spark-warehouse")
      .getOrCreate
    ...
    ...
    spark.stop // 停止当前的SparkSession的方法

    一个Spark SQL应用中可以有多个SparkSession。通常的用例是在每个SparkSession的catalog中保持关系实体在逻辑上独立。

    可以通过支持外部的Hive metastore来启用Hive支持(这在使用Apache Impala等其他大数据项目的项目中尤其有用)。

    implicitis对象

    implicits对象是一个具有Scala隐式方法(又名转换)的助手类,用于将Scala对象转换为Datasets、DataFrame和Columns对象。它还为Scala的“基本”类型定义了编码器(Encoder),例如Int、Double、String , 以及它们的Product和集合。 

    val spark = SparkSession.builder.getOrCreate()
    import spark.implicits._

    implicits对象支持从任何类型(存在于Encoder范围内)的RDD创建数据集,或case类或元组,和Seq。implicits对象还提供从Scala Symbol或 $ 到 Column 的转换。

    它也提供从RDD或Product类型(如case类或元组)的Seq到DataFrame的转换。它能直接把Int、Long、String的RDD转化为带有单个列名为"_1"的列的DataFrame。只允许对Int、Long、String等原始类型的RDD调用toDF方法

    根据给定的Encoder创建空Dataset—— emptyDataset算子

    emptyDataset 创建一个空 Dataset,类型为T。emptyDataset 创建了一个 LocalRelation logical query plan 。

    emptyDataset[T: Encoder]: Dataset[T]
    
    scala> val strings = spark.emptyDataset[String]
    strings: org.apache.spark.sql.Dataset[String] = [value: string]
    scala> strings.printSchema
    root
    |-- value: string (nullable = true)

    从本地Collection或RDD创建Dataset —— createDataset方法

    createDataset是一个实验性的API,用于从本地Scala集合(即Seq[T]、Java List[T]或分布式RDD[T])创建Dataset。

    createDataset[T : Encoder](data: Seq[T]): Dataset[T]
    createDataset[T : Encoder](data: RDD[T]): Dataset[T]
    
    scala> val one = spark.createDataset(Seq(1))
    one: org.apache.spark.sql.Dataset[Int] = [value: int]
    scala> one.show
    +-----+
    |value|
    +-----+
    | 1|
    +-----+

    createDataset 创建一个 LocalRelation(为输入的集合数据)或 LogicalRDD(为输入的 RDD[T])。

    提示:最好使用 scala implicits 和 toDS方法,而不是creatDataset创建数据集,这样就会自动进行类型转换

    val spark: SparkSession = ...
    import spark.implicits._
    scala> val one = Seq(1).toDS
    one: org.apache.spark.sql.Dataset[Int] = [value: int]

    在内部,createDataset首先在范围内查找隐式表达式Encoder以访问schema的AttributeReference。目前只支持未解决的表达式Encoder。

    表达式编码器被用来将(输入的Seq[T]的)元素映射到 InternalRows 集合中。 有了引用和行,createDataset返回一个带有LocalRelation逻辑查询计划的Dataset。

    创建单长列的Dataset——range算子

    range(end: Long): Dataset[java.lang.Long]
    range(start: Long, end: Long): Dataset[java.lang.Long]
    range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
    range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

    range方法族创建Long类型数值的Dataset

    scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
    +---+
    | id|
    +---+
    | 0|
    | 2|
    +---+

    在内部,range 创建了一个新的带有logical plan和Encoder.LONG encoder 的 Dataset[Long]。

    创建空的DataFrame—— emptyDataFrame方法

    emptyDataFrame: DataFrame

    它调用了 createDataFrame方法,传入空的RDD[Row]和空schema的StructType(Nil)。它创建了一个没有行和列的空DataFrame

    通过局部Collections或RDD来创建DataFrame——createDataFrame方法

    createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
    createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
    createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
    // private[sql]
    createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame

    它使用RDD[Row]和输入的schema来创建DataFrame,假定rowRDD中的行都匹配这个schema。

    执行SQL查询(SQL模式)——sql方法

    sql(sqlText: String): DataFrame

    sql方法执行sqlText这个SQL语句块,并创建DataFrame。

    sql在spark-shell中已经被import了,可以直接使用。

    scala> sql("SHOW TABLES").show
    +---------+-----------+
    |tableName|isTemporary|
    +---------+-----------+
    | testdata| false|
    +---------+-----------+

    在内部,sql 请求当前的 ParserInterface去执行一个SQL查询,给出LogicalPlan。sql使用 SessionState访问当前的 ParserInterface。

    然后,sql使用当前的 SparkSession和这个LogicalPlan创建 DataFrame。

    访问UDF注册接口—— udf 属性

    udf: UDFRegistration

    udf 属性可以用来访问UDFRegistration,UDFRegistration允许为基于SQL的查询注册user-defined function(用户自定义函数)。

    val spark: SparkSession = ...
    spark.udf.register("myUpper", (s: String) => s.toUpperCase)
    val strs = ('a' to 'c').map(_.toString).toDS
    strs.registerTempTable("strs")
    scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
    +-----+-----+
    |value|UPPER|
    +-----+-----+
    | a| A|
    | b| B|
    | c| C|
    +-----+-----+

    在内部,udf 仅仅是SessionState.udfRegistration的别名。

    // In org.apache.spark.sql.SparkSession.scala
    def udf: UDFRegistration = sessionState.udfRegistration

    为表创建DataFrame(把表加载成DataFrame)—— table 方法

    table(tableName: String): DataFrame (1)
    table(tableIdent: TableIdentifier): DataFrame // private[sql]

    方法1 把 tableName 解析成为一个 TableIdentifier,然后调用另一个table 方法。

    table方法把输入的表tableName(仅在session catalog会话目录中可用)封装成一个DataFrame

    scala> spark.version
    res0: String = 2.3.0
    scala> spark.catalog.tableExists("t1")
    res1: Boolean = true
    // t1 exists in the catalog
    // let's load it
    val t1 = spark.table("t1")

    访问元数据存储—— catalog 属性

    catalog: Catalog

    catalog 属性是一个惰性接口,它可以访问当前元数据存储,例如(关系型实体如数据库、表、函数、表的列和视图)的 data catalog。

    通过catalog,用户可以create、drop、alter或query数据库、表、函数。

    scala> spark.catalog.listTables.show
    +------------------+--------+-----------+---------+-----------+
    | name|database|description|tableType|isTemporary|
    +------------------+--------+-----------+---------+-----------+
    |my_permanent_table| default| null| MANAGED| false|
    | strs| null| null|TEMPORARY| true|
    +------------------+--------+-----------+---------+-----------+

    在内部,catalog创建了一个使用当前SparkSession的CatalogImpl对象

    @transient lazy val catalog: Catalog = new CatalogImpl(self)

    访问 DataFrameReader —— read 方法

    read 方法返回一个 DataFrameReader,DataFrameReader用来从外部存储系统读取数据并把数据加载成一个 DataFrame

    read: DataFrameReader
    
    val spark: SparkSession = // create instance
    val dfReader: DataFrameReader = spark.read

    运行时配置 —— conf 属性

    @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)

    conf 返回当前运行时的配置(作为一个RuntimeConfig对象),这个配置封装了 SQLConf。

    readStream 方法返回一个 DataStreamReader

    streams 属性

    def stream: StreamingQueryManager = sessionState.streamingQueryManager

    streams 属性通过 SessionState访问StreamingQueryManager,返回的StreamingQueryManager可以激活所有的StreamingQuery

    val spark: SparkSession = ...
    spark.streams.active.foreach(println)

    experimentalMethods 属性

    def experimental: ExperimentalMethods = sessionState.experimentalMethods

    experimentalMethods是一个带有 ExperimentalMethods 的扩展点,它是每个会话中额外策略和Rule[LogicalPlan]的集合。

    experimental 在 SparkPlanner和 SparkOptimizer中被使用。Hive和Structured Streaming将其用于自己的额外策略和优化规则。

    newSession 方法

    // Start a new session with isolated SQL configurations, temporary tables, registered  functions are isolated, 
    // but sharing the underlying `SparkContext` and cached data.
    def newSession(): SparkSession = {
        new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)
    }

    newSession 使用当前的SparkContext和SharedState 创建(启动)一个新的 SparkSession。

    scala> println(sc.version)
    2.3.0
    scala> val newSession = spark.newSession
    newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a

    关停 SparkSession —— stop 方法

      /**
       * Stop the underlying `SparkContext`.
       *
       * @since 2.0.0
       */
      def stop(): Unit = {
        sparkContext.stop()
      }

    从 BaseRelation 创建DataFrame —— baseRelationToDataFrame 方法

    /**
       * Convert a `BaseRelation` created for external data sources into a `DataFrame`.
       *
       * @since 2.0.0
       */
      def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
        Dataset.ofRows(self, LogicalRelation(baseRelation))
      }

    此方法将输入的BaseRelation对象封装到LogicalRelation内部,并通过这个LogicalRelation对象创建 DataFrame。

    LogicalRelation是 BaseRelation 的一个逻辑计划适配器,所以 BaseRelation可以是一个逻辑计划的一部分。

    baseRelationToDataFrame被使用的场景:

    1. DataFrameReader:从一个支持多路径的数据源中加载数据

    2. DataFrameReader:使用jdbc从一个外部表中加载数据

    3. TextInputCSVDataSource:创建一个String的 Dataset

    4. TextInputJsonDataSource:创建一个String的 Dataset

    构造 SessionState ——  instantiateSessionState 内部方法(私有方法)

    /**
       * Helper method to create an instance of `SessionState` based on `className` from conf.
       * The result is either `SessionState` or a Hive based `SessionState`.
       */
      private def instantiateSessionState(
          className: String,
          sparkSession: SparkSession): SessionState = {...}

    instantiateSessionState 利用 className 创建和构造一个BaseSessionStateBuilder,初始化SessionState类时可能会抛出IllegalArgumentException。

    sessionStateClassName 内部方法

    private def sessionStateClassName(conf: SparkConf): String = {
        conf.get(CATALOG_IMPLEMENTATION) match {
          case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
          case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
        }
      }

    对于每个spark.sql.catalogImplementation,sessionStateClassName都给出了SessionState的类名:

    hive -  org.apache.spark.sql.hive.HiveSessionStateBuilder

    in-memory - org.apache.spark.sql.internal.SessionStateBuilder

    从内部二进制行的RDD创建DataFrame —— internalCreateDataFrame 内部方法

      /**
       * Creates a `DataFrame` from an `RDD[InternalRow]`.
       */
      private[sql] def internalCreateDataFrame(
          catalystRows: RDD[InternalRow],
          schema: StructType,
          isStreaming: Boolean = false): DataFrame = {
        // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
        // schema differs from the existing schema on any field data type.
        val logicalPlan = LogicalRDD(
          schema.toAttributes,
          catalystRows,
          isStreaming = isStreaming)(self)
        Dataset.ofRows(self, logicalPlan)
      }

    此方法使用 LogicalRDD创建DataFrame。使用情景为:

    1. DataFrameReader 被请求以JSON或CSV的Dataset创建一个DataFrame

    2. SparkSession 被请求 以行的RDD创建一个DataFrame

    3. InsertIntoDataSourceCommand 被执行

  • 相关阅读:
    IBM Personal Communications 软件:精简绿色版TN3270终端模拟器:经测试可以在 (winxp、win2003、win764)上运行
    virtualbox谨记:续....
    Eclipse连接MySQL数据库
    shell几种字符串加密解密的方法
    表达式语言引擎:Apache Commons JEXL 2.1 发布
    一种表达式语言的解析引擎JEXL简单使用
    Java 实现String语句的执行(Jexl)
    JUnit4
    EL表达式
    Looping through the content of a file in Bash
  • 原文地址:https://www.cnblogs.com/sunspeedzy/p/9438377.html
Copyright © 2011-2022 走看看