zoukankan      html  css  js  c++  java
  • SparkSQL数据源实战篇

                 SparkSQL数据源实战篇

                                         作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

     

    一.通用加载/保存方法

    1>.spark官方默认提供的测试数据

    [root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/            #该目录下是spark官方提供不同文件格式的测试文件
    total 44
    -rw-r--r-- 1 yinzhengjie yinzhengjie  130 May 30 08:02 employees.json
    -rw-r--r-- 1 yinzhengjie yinzhengjie  240 May 30 08:02 full_user.avsc
    -rw-r--r-- 1 yinzhengjie yinzhengjie 5812 May 30 08:02 kv1.txt
    -rw-r--r-- 1 yinzhengjie yinzhengjie   49 May 30 08:02 people.csv
    -rw-r--r-- 1 yinzhengjie yinzhengjie   73 May 30 08:02 people.json
    -rw-r--r-- 1 yinzhengjie yinzhengjie   32 May 30 08:02 people.txt
    -rw-r--r-- 1 yinzhengjie yinzhengjie  185 May 30 08:02 user.avsc
    -rw-r--r-- 1 yinzhengjie yinzhengjie  334 May 30 08:02 users.avro
    -rw-r--r-- 1 yinzhengjie yinzhengjie  547 May 30 08:02 users.orc
    -rw-r--r-- 1 yinzhengjie yinzhengjie  615 May 30 08:02 users.parquet
    [root@hadoop101.yinzhengjie.org.cn ~]# 

    2>.手动指定选项

      Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。
    
      Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。

      数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。   可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。除此之外,可以直接运行SQL在文件上。
    [root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 
    20/07/15 01:09:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
    Spark context available as 'sc' (master = local[*], app id = local-1594746601368).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.4.6
          /_/
             
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet")      #加载parquet文件无需指定格式,因为默认就是parquet哟~
    df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
    
    scala> df.show
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    
    
    scala> 
    scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet")      #加载parquet文件无需指定格式,因为默认就是parquet哟~
    scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")    #加载json格式文件
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    
    scala> 
    scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")    #加载json格式文件
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")        #上面的一种简写形式,直接使用json方法来读取json文件,无需手动指定格式
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    
    scala> 
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")        #上面的一种简写形式,直接使用json方法来读取json文件,无需手动指定格式

    3>.文件保存选项

      可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。

      此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:     Scala
    /Java             Any Language       Meaning   SaveMode.ErrorIfExists(default) "error"(default)   如果文件存在,则报错   SaveMode.Append            "append"        追加   SaveMode.Overwrite         "overwrite"      覆写   SaveMode.Ignore           "ignore"        数据存在,则忽略
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")          #使用spark变量来读取json文件
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.write.save("file:///tmp/output")                    #我们通过df将数据保存到本地磁盘,默认保存格式依旧是parquet哟~
    
    scala>
    
    
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output/            #查看本地磁盘文件的后缀名称依旧是parquet,说明默认的保存格式就是parquet
    total 4
    -rw-r--r-- 1 root root 687 Jul 15 01:34 part-00000-00ce4157-82e7-438b-a0b6-bdbaa29d0f4f-c000.snappy.parquet
    -rw-r--r-- 1 root root   0 Jul 15 01:34 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    scala> df.write.save("file:///tmp/output")                        #我们通过df将数据保存到本地磁盘,默认保存格式依旧是parquet哟~
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.write.format("json").save("file:///tmp/output2")           #保存文件时我们可以指定保存的格式为"json"
    
    scala> 
    
    
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/             #不难发现,保存的文件格式的确是json哟~
    total 4
    -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    scala> df.write.format("json").save("file:///tmp/output2")               #保存文件时我们可以指定保存的格式为"json"
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.write.format("json").save("file:///tmp/output2")              #第一次保存是成功的
    
    scala> df.write.format("json").save("file:///tmp/output2")              #第二次保存到相同目录发现报错了,说是该目录已经存在啦~
    org.apache.spark.sql.AnalysisException: path file:/tmp/output2 already exists.;
      at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:114)
      at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
      at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
      at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
      at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
      at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
      at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
      at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
      at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
      at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
      at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
      ... 49 elided
    
    scala> df.write.format("json").mode("append").save("file:///tmp/output2")      #保存时我们可以指定模式为追加("append"),这样即使和上一次保存的路径相同也不会报错目录已存在的情况。
    
    scala> 
    
    
    
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/                #第一次保存目录时的文件
    total 4
    -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/                 #第二次保存目录成功的文件
    total 8
    -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
    -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 01:42 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    scala> df.write.format("json").mode("append").save("file:///tmp/output2")      #保存时我们可以指定模式为追加("append"),这样即使和上一次保存的路径相同也不会报错目录已存在的情况。
    scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.write.format("json").save("file:///tmp/output2")
    
    scala> df.write.format("json").mode("append").save("file:///tmp/output2")
    
    scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2")      #以覆盖的模式写入指定路径,该路径之前若存储的有数据会被清空哟~
    
    scala> 
    
    
    
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第一次写入
    total 4
    -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第二次追加写入
    total 8
    -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
    -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 01:42 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第三次覆盖写入
    total 4
    -rw-r--r-- 1 root root 71 Jul 15 02:01 part-00000-7ea74899-2f1d-43bd-8c63-4ba17032974b-c000.json
    -rw-r--r-- 1 root root  0 Jul 15 02:01 _SUCCESS
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2")      #以覆盖的模式写入指定路径,该路径之前若存储的有数据会被清空哟~

    二.JSON文件

      Spark SQL能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 咱们可以通过SparkSession.read.json()去加载一个JSON文件。
    
      温馨提示:
        这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。
    [root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json
    {"name":"yinzhengjie","passwd":"2020","age":18}
    {"name":"Jason","passwd":"666666","age":27}
    {"name":"Liming","passwd":"123","age":49}
    {"name":"Jenny","passwd":"456","age":23}
    {"name":"Danny","passwd":"789","age":56} 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json          #测试文件
    scala> val path = "/tmp/user.json"
    path: String = /tmp/user.json
    
    scala> val userDF = spark.read.json(path)              #读取json文件
    userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
    
    scala> userDF.createOrReplaceTempView("users")            #创建临时视图
    
    scala> val teenagerNamesDF = spark.sql("SELECT name FROM users WHERE age BETWEEN 13 AND 19")      #根据视图执行SQL
    teenagerNamesDF: org.apache.spark.sql.DataFrame = [name: string]
    
    scala> teenagerNamesDF.show()                       #展示查询的结果
    +-----------+
    |       name|
    +-----------+
    |yinzhengjie|
    +-----------+
    
    
    scala> 

    三.Parquet文件

      Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。

      Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。

      Spark SQL提供了直接读取和存储Parquet格式文件的方法。
    scala> val peopleDF = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")      #读取本地json文件
    peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> peopleDF.write.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet")      #将读取的内容保存到hdfs上并指定格式为parquet
    
    scala> val parquetFileDF = spark.read.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet")    #从hdfs中读取文件
    parquetFileDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> parquetFileDF.createOrReplaceTempView("parquetFile")          #创建临时视图
    
    scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")      #根据创建的临时视图执行SQL查询
    namesDF: org.apache.spark.sql.DataFrame = [name: string]
    
    scala> namesDF.map(attributes => "Name: " + attributes(0)).show()      #展示查询结果
    +------------+
    |       value|
    +------------+
    |Name: Justin|
    +------------+
    
    
    scala> 
    
    
    [root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/people.json     #该文件为spark官方提供
    -rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 /yinzhengjie/softwares/spark/examples/src/main/resources/people.json
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/spark/examples/src/main/resources/people.json     
    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# hdfs dfs -ls /yinzhengjie/spark/resources/people.parquet              #数据被写入到hdfs集群,因此需要保证你的Hadoop集群时启动着的。
    Found 2 items
    -rw-r--r--   3 root supergroup          0 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/_SUCCESS
    -rw-r--r--   3 root supergroup        687 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/part-00000-3ead21bf-d453-4161-8d6f-08c069d4cb50-c000.snappy.parquet
    [root@hadoop101.yinzhengjie.org.cn ~]# 

    四.JDBC

      Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
    
      温馨提示:
        需要将相关的数据库驱动放到spark的类路径下。

    1>.创建MySQL数据库

    安装MariaDB数据库:
    [root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb-server
    
    将数据库设置为开机自启动:
    [root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb
    Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service.
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    
    登录数据库,创建spark数据库并授权用户登录:
    MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4;
    Query OK, 1 row affected (0.00 sec)
    
    MariaDB [(none)]> 
    MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie';
    Query OK, 0 rows affected (0.00 sec)
    
    MariaDB [(none)]> 
    MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%';
    Query OK, 0 rows affected (0.00 sec)
    
    MariaDB [(none)]> 
    [root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101
    Welcome to the MariaDB monitor.  Commands end with ; or g.
    Your MariaDB connection id is 7
    Server version: 5.5.65-MariaDB MariaDB Server
    
    Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
    
    Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.
    
    MariaDB [(none)]> show databases;
    +--------------------+
    | Database           |
    +--------------------+
    | information_schema |
    | spark              |
    | test               |
    +--------------------+
    3 rows in set (0.01 sec)
    
    MariaDB [(none)]> 
    MariaDB [(none)]> quit
    Bye
    [root@hadoop105.yinzhengjie.org.cn ~]# 
    [root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101        #测试是否可以正常连接数据库

    2>.将相关的数据库驱动放到spark的类路径下

    [root@hadoop105.yinzhengjie.org.cn ~]# ll
    total 188288
    -rw-r--r-- 1 root root      8409 Dec 12  2018 jce_policy-8.zip
    -rw-r--r-- 1 root root 191817140 Mar 25  2019 jdk-8u201-linux-x64.tar.gz
    -rw-r--r-- 1 root root    972009 Mar  1 22:52 mysql-connector-java-5.1.36-bin.jar
    drwxrwxr-x 2 root root        24 Jan 21 01:36 UnlimitedJCEPolicyJDK8
    [root@hadoop105.yinzhengjie.org.cn ~]# 
    [root@hadoop105.yinzhengjie.org.cn ~]# cp mysql-connector-java-5.1.36-bin.jar /yinzhengjie/softwares/spark/jars/
    [root@hadoop105.yinzhengjie.org.cn ~]# 
    [root@hadoop105.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/jars/ | grep mysql
    -rw-r--r-- 1 root root   972009 Jul 15 03:33 mysql-connector-java-5.1.36-bin.jar
    [root@hadoop105.yinzhengjie.org.cn ~]# 

    3>.从Mysql数据库加载数据方式一

    [root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 
    20/07/15 03:33:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040
    Spark context available as 'sc' (master = local[*], app id = local-1594755234548).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.4.6
          /_/
             
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users").option("user", "jason").option("password", "yinzhengjie").load()
    jdbcDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]
    
    scala> jdbcDF.show
    +---+-----------+---+------+
    | id|       name|age|passwd|
    +---+-----------+---+------+
    |  1|yinzhengjie| 18|  2020|
    |  2|      Jason| 27|666666|
    |  3|     Liming| 49|   123|
    |  4|      Jenny| 23|   456|
    |  5|      Danny| 56|   789|
    +---+-----------+---+------+
    
    
    scala> 

    4>.从Mysql数据库加载数据方式二

    [root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 
    20/07/15 03:39:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040
    Spark context available as 'sc' (master = local[*], app id = local-1594755548606).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.4.6
          /_/
             
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val connectionProperties = new java.util.Properties()
    connectionProperties: java.util.Properties = {}
    
    scala> connectionProperties.put("user", "jason")
    res0: Object = null
    
    scala> connectionProperties.put("password", "yinzhengjie")
    res1: Object = null
    
    scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users", connectionProperties)
    jdbcDF2: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]
    
    scala> jdbcDF2.show
    +---+-----------+---+------+
    | id|       name|age|passwd|
    +---+-----------+---+------+
    |  1|yinzhengjie| 18|  2020|
    |  2|      Jason| 27|666666|
    |  3|     Liming| 49|   123|
    |  4|      Jenny| 23|   456|
    |  5|      Danny| 56|   789|
    +---+-----------+---+------+
    
    
    scala> 

    5>.将数据写入Mysql方式一

    直接使用读取的对象进行操作:
    
    scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users2").option("user", "jason").option("password", "yinzhengjie").save()
    
    scala>
    观察MySQL数据库的变化如下:
    [root@hadoop105.yinzhengjie.org.cn ~]# mysql -ujason -pyinzhengjie -h 172.200.4.101
    Welcome to the MariaDB monitor.  Commands end with ; or g.
    Your MariaDB connection id is 20
    Server version: 5.5.65-MariaDB MariaDB Server
    
    Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
    
    Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.
    
    MariaDB [(none)]> use spark
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Database changed
    MariaDB [spark]> 
    MariaDB [spark]> show tables;
    +-----------------+
    | Tables_in_spark |
    +-----------------+
    | users           |
    +-----------------+
    1 row in set (0.00 sec)
    
    MariaDB [spark]> 
    MariaDB [spark]> 
    MariaDB [spark]> show tables;
    +-----------------+
    | Tables_in_spark |
    +-----------------+
    | users           |
    | users2          |
    +-----------------+
    2 rows in set (0.00 sec)
    
    MariaDB [spark]> select * from users2;
    +------+-------------+------+--------+
    | id   | name        | age  | passwd |
    +------+-------------+------+--------+
    |    1 | yinzhengjie |   18 | 2020   |
    |    2 | Jason       |   27 | 666666 |
    |    3 | Liming      |   49 | 123    |
    |    4 | Jenny       |   23 | 456    |
    |    5 | Danny       |   56 | 789    |
    +------+-------------+------+--------+
    5 rows in set (0.00 sec)
    
    MariaDB [spark]> 
    MariaDB [spark]> select * from users2;

    6>.将数据写入Mysql方式二

    直接使用读取的对象进行操作:
    scala> jdbcDF2.write.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users3", connectionProperties)
    
    scala>
    观察MySQL数据库的变化如下:
    MariaDB [spark]> show tables;
    +-----------------+
    | Tables_in_spark |
    +-----------------+
    | users           |
    | users2          |
    +-----------------+
    2 rows in set (0.00 sec)
    
    MariaDB [spark]> select * from users2;
    +------+-------------+------+--------+
    | id   | name        | age  | passwd |
    +------+-------------+------+--------+
    |    1 | yinzhengjie |   18 | 2020   |
    |    2 | Jason       |   27 | 666666 |
    |    3 | Liming      |   49 | 123    |
    |    4 | Jenny       |   23 | 456    |
    |    5 | Danny       |   56 | 789    |
    +------+-------------+------+--------+
    5 rows in set (0.00 sec)
    
    MariaDB [spark]> 
    MariaDB [spark]> show tables;
    +-----------------+
    | Tables_in_spark |
    +-----------------+
    | users           |
    | users2          |
    | users3          |
    +-----------------+
    3 rows in set (0.00 sec)
    
    MariaDB [spark]> 
    MariaDB [spark]> select * from users3;
    +------+-------------+------+--------+
    | id   | name        | age  | passwd |
    +------+-------------+------+--------+
    |    1 | yinzhengjie |   18 | 2020   |
    |    2 | Jason       |   27 | 666666 |
    |    3 | Liming      |   49 | 123    |
    |    4 | Jenny       |   23 | 456    |
    |    5 | Danny       |   56 | 789    |
    +------+-------------+------+--------+
    5 rows in set (0.00 sec)
    MariaDB [spark]> select * from users3;

    五.SparkSQL数据源-Hive数据库

      博主推荐阅读:
        https://www.cnblogs.com/yinzhengjie2020/p/13216504.html
  • 相关阅读:
    利用列表的知识写一个购物小程序
    基本数据类型(While循环,For循环,列表以及相关用法)
    爬虫学习--Day3(小猿圈爬虫开发_1)
    爬虫学习--常用的正则表达式 Day3
    win10系统任务栏点击没有反应
    python 内建类型
    MWeb
    jmeter创建测试计划
    jmeter建立FTP测试计划
    jmeter配置元件
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13211015.html
Copyright © 2011-2022 走看看