SparkSQL数据源实战篇
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.通用加载/保存方法
1>.spark官方默认提供的测试数据
[root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/ #该目录下是spark官方提供不同文件格式的测试文件 total 44 -rw-r--r-- 1 yinzhengjie yinzhengjie 130 May 30 08:02 employees.json -rw-r--r-- 1 yinzhengjie yinzhengjie 240 May 30 08:02 full_user.avsc -rw-r--r-- 1 yinzhengjie yinzhengjie 5812 May 30 08:02 kv1.txt -rw-r--r-- 1 yinzhengjie yinzhengjie 49 May 30 08:02 people.csv -rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 people.json -rw-r--r-- 1 yinzhengjie yinzhengjie 32 May 30 08:02 people.txt -rw-r--r-- 1 yinzhengjie yinzhengjie 185 May 30 08:02 user.avsc -rw-r--r-- 1 yinzhengjie yinzhengjie 334 May 30 08:02 users.avro -rw-r--r-- 1 yinzhengjie yinzhengjie 547 May 30 08:02 users.orc -rw-r--r-- 1 yinzhengjie yinzhengjie 615 May 30 08:02 users.parquet [root@hadoop101.yinzhengjie.org.cn ~]#
2>.手动指定选项
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。
数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。
可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。除此之外,可以直接运行SQL在文件上。
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 20/07/15 01:09:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594746601368). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet") #加载parquet文件无需指定格式,因为默认就是parquet哟~ df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field] scala> df.show +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+ scala>
scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #加载json格式文件 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala>
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #上面的一种简写形式,直接使用json方法来读取json文件,无需手动指定格式 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala>
3>.文件保存选项
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。
此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表: Scala/Java Any Language Meaning SaveMode.ErrorIfExists(default) "error"(default) 如果文件存在,则报错 SaveMode.Append "append" 追加 SaveMode.Overwrite "overwrite" 覆写 SaveMode.Ignore "ignore" 数据存在,则忽略
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #使用spark变量来读取json文件 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.save("file:///tmp/output") #我们通过df将数据保存到本地磁盘,默认保存格式依旧是parquet哟~ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output/ #查看本地磁盘文件的后缀名称依旧是parquet,说明默认的保存格式就是parquet total 4 -rw-r--r-- 1 root root 687 Jul 15 01:34 part-00000-00ce4157-82e7-438b-a0b6-bdbaa29d0f4f-c000.snappy.parquet -rw-r--r-- 1 root root 0 Jul 15 01:34 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") #保存文件时我们可以指定保存的格式为"json" scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #不难发现,保存的文件格式的确是json哟~ total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") #第一次保存是成功的 scala> df.write.format("json").save("file:///tmp/output2") #第二次保存到相同目录发现报错了,说是该目录已经存在啦~ org.apache.spark.sql.AnalysisException: path file:/tmp/output2 already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:114) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230) ... 49 elided scala> df.write.format("json").mode("append").save("file:///tmp/output2") #保存时我们可以指定模式为追加("append"),这样即使和上一次保存的路径相同也不会报错目录已存在的情况。 scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第一次保存目录时的文件 total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第二次保存目录成功的文件 total 8 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:42 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") scala> df.write.format("json").mode("append").save("file:///tmp/output2") scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2") #以覆盖的模式写入指定路径,该路径之前若存储的有数据会被清空哟~ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第一次写入 total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第二次追加写入 total 8 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:42 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第三次覆盖写入 total 4 -rw-r--r-- 1 root root 71 Jul 15 02:01 part-00000-7ea74899-2f1d-43bd-8c63-4ba17032974b-c000.json -rw-r--r-- 1 root root 0 Jul 15 02:01 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
二.JSON文件
Spark SQL能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 咱们可以通过SparkSession.read.json()去加载一个JSON文件。
温馨提示:
这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json {"name":"yinzhengjie","passwd":"2020","age":18} {"name":"Jason","passwd":"666666","age":27} {"name":"Liming","passwd":"123","age":49} {"name":"Jenny","passwd":"456","age":23} {"name":"Danny","passwd":"789","age":56} [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val path = "/tmp/user.json" path: String = /tmp/user.json scala> val userDF = spark.read.json(path) #读取json文件 userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field] scala> userDF.createOrReplaceTempView("users") #创建临时视图 scala> val teenagerNamesDF = spark.sql("SELECT name FROM users WHERE age BETWEEN 13 AND 19") #根据视图执行SQL teenagerNamesDF: org.apache.spark.sql.DataFrame = [name: string] scala> teenagerNamesDF.show() #展示查询的结果 +-----------+ | name| +-----------+ |yinzhengjie| +-----------+ scala>
三.Parquet文件
Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。
Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。
Spark SQL提供了直接读取和存储Parquet格式文件的方法。
scala> val peopleDF = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #读取本地json文件 peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> peopleDF.write.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet") #将读取的内容保存到hdfs上并指定格式为parquet scala> val parquetFileDF = spark.read.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet") #从hdfs中读取文件 parquetFileDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> parquetFileDF.createOrReplaceTempView("parquetFile") #创建临时视图 scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") #根据创建的临时视图执行SQL查询 namesDF: org.apache.spark.sql.DataFrame = [name: string] scala> namesDF.map(attributes => "Name: " + attributes(0)).show() #展示查询结果 +------------+ | value| +------------+ |Name: Justin| +------------+ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/people.json #该文件为spark官方提供 -rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 /yinzhengjie/softwares/spark/examples/src/main/resources/people.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/spark/examples/src/main/resources/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# hdfs dfs -ls /yinzhengjie/spark/resources/people.parquet #数据被写入到hdfs集群,因此需要保证你的Hadoop集群时启动着的。 Found 2 items -rw-r--r-- 3 root supergroup 0 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/_SUCCESS -rw-r--r-- 3 root supergroup 687 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/part-00000-3ead21bf-d453-4161-8d6f-08c069d4cb50-c000.snappy.parquet [root@hadoop101.yinzhengjie.org.cn ~]#
四.JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
温馨提示:
需要将相关的数据库驱动放到spark的类路径下。
1>.创建MySQL数据库
安装MariaDB数据库: [root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb-server 将数据库设置为开机自启动: [root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service. [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb [root@hadoop101.yinzhengjie.org.cn ~]# 登录数据库,创建spark数据库并授权用户登录: MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4; Query OK, 1 row affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]>
[root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101 Welcome to the MariaDB monitor. Commands end with ; or g. Your MariaDB connection id is 7 Server version: 5.5.65-MariaDB MariaDB Server Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or 'h' for help. Type 'c' to clear the current input statement. MariaDB [(none)]> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | spark | | test | +--------------------+ 3 rows in set (0.01 sec) MariaDB [(none)]> MariaDB [(none)]> quit Bye [root@hadoop105.yinzhengjie.org.cn ~]#
2>.将相关的数据库驱动放到spark的类路径下
[root@hadoop105.yinzhengjie.org.cn ~]# ll total 188288 -rw-r--r-- 1 root root 8409 Dec 12 2018 jce_policy-8.zip -rw-r--r-- 1 root root 191817140 Mar 25 2019 jdk-8u201-linux-x64.tar.gz -rw-r--r-- 1 root root 972009 Mar 1 22:52 mysql-connector-java-5.1.36-bin.jar drwxrwxr-x 2 root root 24 Jan 21 01:36 UnlimitedJCEPolicyJDK8 [root@hadoop105.yinzhengjie.org.cn ~]# [root@hadoop105.yinzhengjie.org.cn ~]# cp mysql-connector-java-5.1.36-bin.jar /yinzhengjie/softwares/spark/jars/ [root@hadoop105.yinzhengjie.org.cn ~]# [root@hadoop105.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/jars/ | grep mysql -rw-r--r-- 1 root root 972009 Jul 15 03:33 mysql-connector-java-5.1.36-bin.jar [root@hadoop105.yinzhengjie.org.cn ~]#
3>.从Mysql数据库加载数据方式一
[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 20/07/15 03:33:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594755234548). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users").option("user", "jason").option("password", "yinzhengjie").load() jdbcDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields] scala> jdbcDF.show +---+-----------+---+------+ | id| name|age|passwd| +---+-----------+---+------+ | 1|yinzhengjie| 18| 2020| | 2| Jason| 27|666666| | 3| Liming| 49| 123| | 4| Jenny| 23| 456| | 5| Danny| 56| 789| +---+-----------+---+------+ scala>
4>.从Mysql数据库加载数据方式二
[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 20/07/15 03:39:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594755548606). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val connectionProperties = new java.util.Properties() connectionProperties: java.util.Properties = {} scala> connectionProperties.put("user", "jason") res0: Object = null scala> connectionProperties.put("password", "yinzhengjie") res1: Object = null scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users", connectionProperties) jdbcDF2: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields] scala> jdbcDF2.show +---+-----------+---+------+ | id| name|age|passwd| +---+-----------+---+------+ | 1|yinzhengjie| 18| 2020| | 2| Jason| 27|666666| | 3| Liming| 49| 123| | 4| Jenny| 23| 456| | 5| Danny| 56| 789| +---+-----------+---+------+ scala>
5>.将数据写入Mysql方式一
直接使用读取的对象进行操作: scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users2").option("user", "jason").option("password", "yinzhengjie").save() scala>
观察MySQL数据库的变化如下: [root@hadoop105.yinzhengjie.org.cn ~]# mysql -ujason -pyinzhengjie -h 172.200.4.101 Welcome to the MariaDB monitor. Commands end with ; or g. Your MariaDB connection id is 20 Server version: 5.5.65-MariaDB MariaDB Server Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or 'h' for help. Type 'c' to clear the current input statement. MariaDB [(none)]> use spark Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | +-----------------+ 1 row in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | +-----------------+ 2 rows in set (0.00 sec) MariaDB [spark]> select * from users2; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec) MariaDB [spark]>
6>.将数据写入Mysql方式二
直接使用读取的对象进行操作: scala> jdbcDF2.write.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users3", connectionProperties) scala>
观察MySQL数据库的变化如下: MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | +-----------------+ 2 rows in set (0.00 sec) MariaDB [spark]> select * from users2; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | | users3 | +-----------------+ 3 rows in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> select * from users3; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec)
五.SparkSQL数据源-Hive数据库
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/13216504.html