zoukankan      html  css  js  c++  java
  • Spark SQL 数据源(三)

    Spark SQL 可以从多种数据源读取数据,也可以将数据写入多种数据源,如:json、txt、hdfs、parquet、jdbc、hive

    1. 通用读取与保存

    读取

    // 方法一,若没有指定文件格式,则默认为 parquet,也可以通过修改 spark.sql.sources.default 来修改默认文件格式
    // 文件格式:json, parquet, jdbc, orc, libsvm, csv, text
    spark.read.format(文件格式).load(路径)
    
    // 读取 json 文件
    spark.read.format("json").load(路径)
    
    // 方法二:读取 json 文件
    spark.read.json(路径)
    

    保存

    // 方法一:保存为 json 文件
    df.write.format("json").save(路径)
    
    // 方法二:指定保存模式
    df.write.mode(SaveMode.Overwrite).csv("/tmp/spark_output/zipcodes")
    

    saveMode 模式

    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists(default) "error"(default) 如果文件已经存在则抛出异常
    SaveMode.Append "append" 如果文件已经存在则追加
    SaveMode.Overwrite "overwrite" 如果文件已经存在则覆盖
    SaveMode.Ignore "ignore" 如果文件已经存在则忽略

    也可以直接在文件上直接进行 select 查询:

    // json表示文件的格式. 后面的文件具体路径需要用反引号括起来
    spark.sql("select * from json.`examples/src/main/resources/people.json`")
    

    参考文章:Spark Read CSV file into DataFrame

    2. jdbc 读取保存

    Spark SQL 也支持使用 JDBC 从其他的关系型数据库中读取数据,得到直接就是一个 df,也支持将 df 回写数据库,依赖:

    <!--jdbc 驱动-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>
    

    注意:在 spark-shell 操作 jdbc,需要把相关的 jdbc 驱动 copyjars 目录下

    2.1 读取

    import org.apache.spark.sql.SparkSession
    import java.util.Properties
    
    object ReadJdbc {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder.appName("read jdbc").master("local[2]").getOrCreate()
        val sc = spark.sparkContext
    
    
        val url = "jdbc:mysql://hadoop201:3306/databaseName"
        val user = "root"
        val pwd = "pwd"
        val tableName = "user"
    
        // 方法一
        val df = spark.read.format("jdbc")
          .option("url", url)
          .option("user", user)
          .option("password", pwd)
          .option("dbtable", tableName)
          .load()
    
        // 方法二
        val props: Properties = new Properties()
        props.setProperty("user", user)
        props.setProperty("password", pwd)
        val df2 = spark.read.jdbc(url, tableName, props)
    
        df.show()
        sc.stop()
        spark.stop()
      }
    }
    

    2.2 保存

    import org.apache.spark.sql.{SaveMode, SparkSession}
    import java.util.Properties
    
    object WriteJdbc {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = spark.sparkContext
    
        val df = spark.read.json("users.json")
        val url = "jdbc:mysql://hadoop201:3306/databaseName"
        val user = "root"
        val pwd = "pwd"
        val tableName = "user"
    
        // 方法一
        df.write
          .format("jdbc")
          .option("url", url)
          .option("user", user)
          .option("password", pwd)
          .option("dbtable", tableName)
          .mode("append")
          .mode(SaveMode.Overwrite)
          .save()
    
        // 方法二
        val props = new Properties()
        props.put("user", user)
        props.put("password", pwd)
        df.write.jdbc(url, tableName, props)
    
        sc.stop()
        spark.stop()
      }
    }
    

    3. hive 存取

    HiveHadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含

    包含 Hive 支持的 Spark SQL:支持 Hive 表访问、UDF、Hive 查询语言 HiveQL、HQL 等。若在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持

    • Spark SQL 可以连接到一个已经部署好的 Hive 上,即外接 Hive,推荐此种做法, 但是需要将 hive-sie.xml 配置文件复制到 $SPARK_HOME/conf
    • 若没有连接外接 Hive,也可以使用 Hive,会在当前 Spark SQL 工作目录创建 Hive 的元数据仓库 metastone_db,即内嵌 hive,不推荐;创建的表会被存储在默认的文件中,如:/user/hive/warehouse(若 classpath 有配置 hdfs-site.xml 则默认文件系统为 hdfs,否则为本地)

    内嵌 hive 一般不推荐使用,因此这里讨论的是外置 hive

    3.1 准备工作

    • Spark 要接管 Hive 需要把 hive-site.xml copyspark/conf 目录下
    • MySQL 驱动 copyspark/jars 目录下
    • 若无法访问 hdfs,还需 copy core-site.xml、hdfs-site.xmlhive/conf 目录下
    
    [hadoop@hadoop1 apps]$ cp hive/conf/hive-site.xml spark-2.2.0/conf/
    [hadoop@hadoop1 apps]$ ls spark-2.2.0/conf/
    docker.properties.template  hive-site.xml              metrics.properties.template  spark-defaults.conf           spark-env.sh
    fairscheduler.xml.template  log4j.properties.template  slaves                       spark-defaults.conf.template
    [hadoop@hadoop1 apps]$ cp hive/lib/mysql-connector-java-5.1.27-bin.jar spark-2.2.0/jars/
    [hadoop@hadoop1 apps]$ spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    21/11/14 22:47:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    21/11/14 22:48:13 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    Spark context Web UI available at http://192.168.131.137:4040
    Spark context available as 'sc' (master = local[*], app id = local-1636901264338).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_261)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.sql("show databases").show
    +------------+
    |databaseName|
    +------------+
    |     default|
    |      hive_1|
    +------------+
    
    
    scala> spark.sql("use hive_1").show
    ++
    ||
    ++
    ++
    
    
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    |  hive_1|  student|      false|
    +--------+---------+-----------+
    
  • 相关阅读:
    数据库索引概念与优化
    数据库查询效率分析
    C语言结构体与C++结构体之间的差异
    判断一个序列是否为栈的弹出序列
    C语言中的结构体
    C++ STL 中的 std::sort()
    Spring注入值到静态变量
    层次遍历二叉树
    计算二叉树的大小
    计算二叉树的高度
  • 原文地址:https://www.cnblogs.com/midworld/p/15647005.html
Copyright © 2011-2022 走看看