zoukankan      html  css  js  c++  java
  • Spark学习进度8-SparkSQL

    SparkSQL简介

    SparkSQL 的出现契机

    SparkSQL
    解决的问题
    • Spark SQL 使用 Hive 解析 SQL 生成 AST 语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖 Hive

    • 执行计划和优化交给优化器 Catalyst

    • 内建了一套简单的 SQL 解析器, 可以不使用 HQL, 此外, 还引入和 DataFrame 这样的 DSL API, 完全可以不依赖任何 Hive 的组件

    • Shark 只能查询文件, Spark SQL 可以直接降查询作用于 RDD, 这一点是一个大进步

    新的问题

    对于初期版本的 SparkSQL, 依然有挺多问题, 例如只能支持 SQL 的使用, 不能很好的兼容命令式, 入口不够统一等

    Dataset

    SparkSQL 在 2.0 时代, 增加了一个新的 API, 叫做 DatasetDataset 统一和结合了 SQL 的访问和命令式 API 的使用, 这是一个划时代的进步

    在 Dataset 中可以轻易的做到使用 SQL 查询并且筛选数据, 然后使用命令式 API 进行探索式分析

     
    重要性
    9b1db9d54c796e0eb6769cafd2ef19ac

    SparkSQL 不只是一个 SQL 引擎, SparkSQL 也包含了一套对 结构化数据的命令式 API, 事实上, 所有 Spark 中常见的工具, 都是依赖和依照于 SparkSQL 的 API 设计的

    总结: SparkSQL 是什么

    SparkSQL 是一个为了支持 SQL 而设计的工具, 但同时也支持命令式的 API

    SparkSQL 的适用场景

    定义特点举例

    结构化数据

    有固定的 Schema

    有预定义的 Schema

    关系型数据库的表

    半结构化数据

    没有固定的 Schema, 但是有结构

    没有固定的 Schema, 有结构信息, 数据一般是自描述的

    指一些有结构的文件格式, 例如 JSON

    非结构化数据

    没有固定 Schema, 也没有结构

    没有固定 Schema, 也没有结构

    指文档图片之类的格式

    结构化数据

    一般指数据有固定的 Schema, 例如在用户表中, name 字段是 String 型, 那么每一条数据的 name 字段值都可以当作 String 来使用

    +----+--------------+---------------------------+-------+---------+
    | id | name         | url                       | alexa | country |
    +----+--------------+---------------------------+-------+---------+
    | 1  | Google       | https://www.google.cm/    | 1     | USA     |
    | 2  | 淘宝          | https://www.taobao.com/   | 13    | CN      |
    | 3  | 菜鸟教程      | http://www.runoob.com/    | 4689  | CN      |
    | 4  | 微博          | http://weibo.com/         | 20    | CN      |
    | 5  | Facebook     | https://www.facebook.com/ | 3     | USA     |
    +----+--------------+---------------------------+-------+---------+
    半结构化数据

    一般指的是数据没有固定的 Schema, 但是数据本身是有结构的

    {
         "firstName": "John",
         "lastName": "Smith",
         "age": 25,
         "phoneNumber":
         [
             {
               "type": "home",
               "number": "212 555-1234"
             },
             {
               "type": "fax",
               "number": "646 555-4567"
             }
         ]
     }
    没有固定 Schema

    指的是半结构化数据是没有固定的 Schema 的, 可以理解为没有显式指定 Schema
    比如说一个用户信息的 JSON 文件, 第一条数据的 phone_num 有可能是 String, 第二条数据虽说应该也是 String, 但是如果硬要指定为 BigInt, 也是有可能的
    因为没有指定 Schema, 没有显式的强制的约束

    SparkSQL 处理什么数据的问题?
    • Spark 的 RDD 主要用于处理 非结构化数据 和 半结构化数据

    • SparkSQL 主要用于处理 结构化数据

    SparkSQL 相较于 RDD 的优势在哪?
    • SparkSQL 提供了更好的外部数据源读写支持

      • 因为大部分外部数据源是有结构化的, 需要在 RDD 之外有一个新的解决方案, 来整合这些结构化数据源

    • SparkSQL 提供了直接访问列的能力

      • 因为 SparkSQL 主要用做于处理结构化数据, 所以其提供的 API 具有一些普通数据库的能力

    总结: SparkSQL 适用于什么场景?

    SparkSQL 适用于处理结构化数据的场景

    SparkSQL 初体验

    命令式 API 的入门案例

    @Test
      def dsIntro(): Unit ={
        val spark=new SparkSession.Builder()
          .appName("ds intro")
          .master("local[6]")
          .getOrCreate()
        import spark.implicits._
        val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30)))
        val personDS = sourceRDD.toDS()
        val result=personDS.where('age>10)
          .where('age<20)
          .select('name)
          .as[String]
        result.show()
      }

    SparkSQL 中有一个新的入口点, 叫做 SparkSessionSparkSQL 中有一个新的类型叫做 DatasetSparkSQL 有能力直接通过字段名访问数据集, 说明 SparkSQL 的 API 中是携带 Schema 信息的

    DataFrame & Dataset

    eca0d2e1e2b5ce678161438d87707b61
    sparkSQL 对外提供的 API 有两类:
    一类是直接执行 SQL, 另外一类就是命令式. 
    SparkSQL 提供的命令式 API 就是 DataFrame 和 Dataset, 暂时也可以认为 DataFrame 就是 Dataset, 只是在不同的 API 中返回的是 Dataset 的不同表现形式
     
    例如:
    // RDD
    rdd.map { case Person(id, name, age) => (age, 1) }
      .reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}
    
    // DataFrame
    df.groupBy("age").count("age")

    通过上面的代码, 可以清晰的看到, SparkSQL 的命令式操作相比于 RDD 来说, 可以直接通过 Schema 信息来访问其中某个字段, 非常的方便

    SQL版本方式

    @Test
      def dfIntro(): Unit ={
        val spark=new SparkSession.Builder()
          .appName("ds intro")
          .master("local[6]")
          .getOrCreate()
        import spark.implicits._
        val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30)))
        val df = sourceRDD.toDF()
        df.createOrReplaceTempView("person")
        val result = spark.sql("select name from person where age>10 and age <20")
        result.show()
      }

    以往使用 SQL 肯定是要有一个表的, 在 Spark 中, 并不存在表的概念, 但是有一个近似的概念, 叫做 DataFrame, 所以一般情况下要先通过 DataFrame 或者 Dataset 注册一张临时表, 然后使用 SQL 操作这张临时表

    总结

    SparkSQL 提供了 SQL 和 命令式 API 两种不同的访问结构化数据的形式, 并且它们之间可以无缝的衔接

    命令式 API 由一个叫做 Dataset 的组件提供, 其还有一个变形, 叫做 DataFrame

    Dataset 的特点

    Dataset 是什么?

    @Test
      def dataset1(): Unit ={
        //1.创建SparkSession
        val spark=new SparkSession.Builder()
          .appName("dataset1")
          .master("local[6]")
          .getOrCreate()
        //2.导入隐式转换
        import spark.implicits._
        //3.演示
        val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30)))
        val ds = sourceRDD.toDS()
        //DataSet  支持强类型的API
        ds.filter( item => item.age>10).show()
        //DataSet 支持弱类型的API
        ds.filter('age>10).show()
        ds.filter($"age">10).show()
        //DataSet 可以直接编写SQL 表达式
        ds.filter("age >10").show()
      }

    Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API

    可以获取 Dataset 对应的 RDD 表示:

    • 使用 Dataset.rdd 将 Dataset 转为 RDD 的形式
    • Dataset 的执行计划底层的 RDD

    DataFrame 的作用和常见操作

    DataFrame 是什么?

    DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

     DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

     DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等

     @Test
      def dataframe1(): Unit ={
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("datafreame1")
          .master("local[6]")
          .getOrCreate()
        //2.创建DataFrame
        import spark.implicits._
        val dataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()
        //3.DataFrame花样
        dataFrame.where('age>10)
          .select('name)
          .show()
      }

    通过集合创建 DataFrame 的时候, 集合中不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建

    //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("datafreame1")
          .master("local[6]")
          .getOrCreate()
        //2.创建DataFrame
        import spark.implicits._
        val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
    
        //1.toDF
        val df1=personList.toDF()
        val df2=spark.sparkContext.parallelize(personList).toDF()

    在 DataFrame 上可以使用的常规操作

    Step 1: 首先可以打印 DataFrame 的 Schema, 查看其中所包含的列, 以及列的类型
    val spark: SparkSession = new sql.SparkSession.Builder()
      .appName("hello")
      .master("local[6]")
      .getOrCreate()
    
    val df = spark.read
      .option("header", true)
      .csv("dataset/BeijingPM20100101_20151231.csv")
    
    df.printSchema()

    Step 2: 对于大部分计算来说, 可能不会使用所有的列, 所以可以选择其中某些重要的列

    df.select('year, 'month, 'PM_Dongsi)

    Step 3: 可以针对某些列进行分组, 后对每组数据通过函数做聚合

    df.select('year, 'month, 'PM_Dongsi)
      .where('PM_Dongsi =!= "Na")
      .groupBy('year, 'month)
      .count()
      .show()

    使用 SQL 操作 DataFrame

    @Test
      def dataframe3(): Unit ={
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("datafreame1")
          .master("local[6]")
          .getOrCreate()
        //2.读取数据集
        import spark.implicits._
        val dataFrame = spark.read
          .option("header",value = true)
          .csv("dataset/BeijingPM20100101_20151231.csv")
        //3.处理
        //处理一
    //    dataFrame.select('year,'month,'PM_Dongsi)
    //      .where('PM_Dongsi =!= "NA")
    //      .groupBy('year,'month)
    //      .count()
    //      .show()
        //处理二
        dataFrame.createOrReplaceTempView("pm")
    
        val result=spark.sql("select year,month,count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year,month")
        result.show()
        spark.stop()
        //4.得出结论
      }
    总结
    1. DataFrame 是一个类似于关系型数据库表的函数式组件

    2. DataFrame 一般处理结构化数据和半结构化数据

    3. DataFrame 具有数据对象的 Schema 信息

    4. 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame

    5. DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建

    Dataset 和 DataFrame 的异同

    DataFrame 就是 Dataset

    第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

    第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象
    第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同
    第四点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查
    @Test
      def qubie(): Unit ={
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("datafreame1")
          .master("local[6]")
          .getOrCreate()
        //2.创建DataFrame
        import spark.implicits._
        val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
    
        //DataFrame 是弱类型
        val df=personList.toDF()
        df.map( (row:Row) =>Row(row.get(0),row.getAs[Int](1)*2) )(RowEncoder.apply(df.schema))
          .show()
    
        //DataFrame 所代表的弱类型操作编译不安全
        df.groupBy("name,age")
    
        //Dataset 是强类型
        val ds=personList.toDS()
        ds.map( (person:Person) => Person(person.name,person.age*2) )
          .show()
        //Dataset 所代表的强类型操作编译运行都安全
    //    ds.filter(person =>person.age)
      }

    Row 是什么?

    Row 对象表示的是一个 

    Row 的操作类似于 Scala 中的 Map 数据类型

    @Test
      def row(): Unit ={
        //1.row如何创建,他是什么
        val p=Person("zhangsan",14)
        val row=Row("zhangsan",14)
        //2.如何从row中获取数据
        println(row.getString(0))
        println(row.getInt(1))
        //3.Row是样例类
        row match {
          case Row(name,age) => println(name,age)
        }

    DataFrame 和 Dataset 之间可以非常简单的相互转换

    val spark: SparkSession = new sql.SparkSession.Builder()
      .appName("hello")
      .master("local[6]")
      .getOrCreate()
    
    import spark.implicits._
    
    val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
    val ds_fdf: Dataset[People] = df.as[People]
    
    val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
    val df_fds: DataFrame = ds.toDF()

    总结

    1. DataFrame 就是 Dataset, 他们的方式是一样的, 也都支持 API 和 SQL 两种操作方式

    2. DataFrame 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作

    3. DataFrame 中的数据表示为 Row, 是一个行的概念

    数据读写

    DataFrameReader

    DataFrameReader 由如下几个组件组成

    组件解释

    schema

    结构信息, 因为 Dataset 是有结构的, 所以在读取数据的时候, 就需要有 Schema 信息, 有可能是从外部数据源获取的, 也有可能是指定的

    option

    连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV 文件是否引入 Header 等

    format

    外部数据源的格式, 例如 csvjdbcjson 等

     @Test
      def reader1(): Unit ={
    
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("reader1")
          .master("local[6]")
          .getOrCreate()
    
        //2.框架
        val read: DataFrameReader = spark.read
      }

    DataFrameReader 有两种访问方式, 一种是使用 load 方法加载, 使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csvjsonjdbc 等

    @Test
      def reader2(): Unit ={
    
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("reader1")
          .master("local[6]")
          .getOrCreate()
    
        //2.第一种形式
          spark.read
          .format("csv")
          .option("header",value = true)
          .option("inferSchema",value = true)
          .load("dataset/BeijingPM20100101_20151231.csv")
          .show()
    
        //3.第二种形式
        spark.read
          .option("header",value = true)
          .option("inferSchema",value = true)
          .csv("dataset/BeijingPM20100101_20151231.csv")
          .show(10)
      }

    DataFrameWriter

    DataFrameWriter 中由如下几个部分组成

    组件解释

    source

    写入目标, 文件格式等, 通过 format 方法设定

    mode

    写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定

    extraOptions

    外部参数, 例如 JDBC 的 URL, 通过 optionsoption 设定

    partitioningColumns

    类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定

    bucketColumnNames

    类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定

    sortColumnNames

    用于排序的列, 通过 sortBy 设定

    mode 指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等

    Scala 对象表示字符串表示解释

    SaveMode.ErrorIfExists

    "error"

    将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错

    SaveMode.Append

    "append"

    将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中

    SaveMode.Overwrite

    "overwrite"

    将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标

    SaveMode.Ignore

    "ignore"

    将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

     

    DataFrameWriter 也有两种使用方式, 一种是使用 format 配合 save, 还有一种是使用封装方法, 例如 csvjsonsaveAsTable 等
    @Test
      def writer1(): Unit = {
        //2.读取数据集
        val df=spark.read.option("header",true).csv("dataset/BeijingPM20100101_20151231.csv")
    
        //3.写入数据集
        df.write.json("dataset/bejing_pm.json")
        df.write.format("json").save("dataset/beijing_pm2.json")
      }

    读写 Parquet 格式文件

    读写 Parquet 文件:

    @Test
      def parquet(): Unit = {
        //2.读取数据集
        val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")
        df.write
          .mode(SaveMode.Overwrite)
          .format("parquet")
          .save("dataset/beijing_pm3")
    
        spark.read
          .load("dataset/beijing_pm3")
          .show()
      }

    写入 Parquet 的时候可以指定分区

    Spark 在写入文件的时候是支持分区的, 可以像 Hive 一样设置某个列为分区列

    @Test
      def parquetPartitions(): Unit ={
        val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")
    
        //2.写文件  表分区
        //写分区的时候,分区列不会包含在生成的文件中
        //直接通过文件来读取,分区信息会丢失
        //spark sql 会进行自动分区发现
        df.write
          .partitionBy("year","month")
          .save("dataset/beijing_pm4")
    
        //3.读文件,自动发现分区
        spark.read
          .parquet("dataset/beijing_pm4")
          .printSchema()
    
      }

    读写 JSON 格式文件

    @Test
      def json(){
        val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")
    
        df.write
          .json("dataset/beijing_pm5.json")
    
        spark.read
          .json("dataset/beijing_pm5.json")
          .show()
      }

    Spark 可以从一个保存了 JSON 格式字符串的 Dataset[String] 中读取 JSON 信息, 转为 DataFrame

    @Test
      def json2(): Unit ={
        val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")
        //直接从rdd读取json的DataFrame
        val jsonrdd = df.toJSON.rdd
        spark.read.json(jsonrdd).show()
      }

    访问 Hive

    启动 Hive MetaStore

    nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &

    访问 Hive 表

    在 Hive 中创建表

    第一步, 需要先将文件上传到集群中, 使用如下命令上传到 HDFS 中

    hdfs dfs -mkdir -p /dataset
    hdfs dfs -put studenttabl10k /dataset/

    第二步, 使用 Hive 或者 Beeline 执行如下 SQL

    CREATE DATABASE IF NOT EXISTS spark_integrition;
    
    USE spark_integrition;
    
    CREATE EXTERNAL TABLE student
    (
      name  STRING,
      age   INT,
      gpa   string
    )
    ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '	'
      LINES TERMINATED BY '
    '
    STORED AS TEXTFILE
    LOCATION '/dataset/hive';

    第三步:将hdfs上的文件,加载到该表

    LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student;

    通过 SparkSQL 查询 Hive 的表

    scala> spark.sql("use spark_integrition")
    scala> val resultDF = spark.sql("select * from student limit 10")
    scala> resultDF.show()

    通过 SparkSQL 创建 Hive 表

    val createTableStr =
      """
        |CREATE EXTERNAL TABLE student
        |(
        |  name  STRING,
        |  age   INT,
        |  gpa   string
        |)
        |ROW FORMAT DELIMITED
        |  FIELDS TERMINATED BY '	'
        |  LINES TERMINATED BY '
    '
        |STORED AS TEXTFILE
        |LOCATION '/dataset/hive'
      """.stripMargin
    
    spark.sql("CREATE DATABASE IF NOT EXISTS spark03")
    spark.sql("USE spark03")
    spark.sql(createTableStr)
    spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student")
    spark.sql("select * from student limit 100").show()

    使用 SparkSQL 处理数据并保存进 Hive 表

    package cn.itcast.spark.sql
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}
    
    object HiveAccess {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("hive example")
          .config("spark.sql.warehouse.dir", "hdfs://hadoop101:8020/dataset/hive")
          .config("hive.metastore.uris", "thrift://hadoop101:9083")
          .enableHiveSupport()
          .getOrCreate()
    
        import spark.implicits._
    
        //读取数据
        val schema = StructType(
          List(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("gpa", FloatType)
          )
        )
    
        val studentDF = spark.read
          .option("delimiter", "	")
          .schema(schema)
          .csv("hdfs://hadoop101:8020/dataset/studenttab10k")
    
        val resultDF = studentDF.where('age < 50)
    
        //写入
        resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark03.student")
    
    
    
    
      }
    
    }

    打包后:通过spark-submit来运行

    bin/spark-submit --class cn.itcast.spark.rdd.WordCount --master spark://hadoop101:7077 ~/original-spark-1.0-SNAPSHOT.jar

    然后去hive中查看:

    JDBC

    package cn.itcast.spark.sql
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}
    
    object Mysql {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession
          .builder()
          .appName("mysql example")
          .master("local[6]")
          .getOrCreate()
    
        val schema = StructType(
          List(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("gpa", FloatType)
          )
        )
    
        val studentDF = spark.read
          //分隔符:制表符
          .option("delimiter", "	")
          .schema(schema)
          .csv("dataset/studenttab10k")
    
        studentDF.write
          .format("jdbc")
          .mode(SaveMode.Overwrite)
          .option("url", "jdbc:mysql://hadoop101:3306/spark02")
          .option("dbtable", "student")
          .option("user", "spark")
          .option("password", "fengge666")
          .save()
    
      }
    
    
    }

    SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到

    属性含义

    partitionColumn

    指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID

    lowerBoundupperBound

    确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区

    numPartitions

    分区数量

    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://node01:3306/spark_test")
      .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
      .option("user", "spark")
      .option("password", "Spark123!")
      .option("partitionColumn", "age")
      .option("lowerBound", 1)
      .option("upperBound", 60)
      .option("numPartitions", 10)
      .load()
      .show()

  • 相关阅读:
    面试总结
    java高级开发
    idea的快捷键
    微服务面试题目
    windows下jenkins的安装与配置
    微服务面试集合
    springlcoud中使用consul作为注册中心
    乐观锁和悲观锁
    volatile与synchronized的区别
    Java CAS 和ABA问题
  • 原文地址:https://www.cnblogs.com/xiaofengzai/p/14269334.html
Copyright © 2011-2022 走看看