数据库也是 spark 数据源创建 df 的一种方式,因为比较重要,所以单独算一节。
本文以 postgres 为例
安装 JDBC
首先需要 安装 postgres 的客户端驱动,即 JDBC 驱动,这是官方下载地址,JDBC,根据数据库版本下载对应的驱动
上传至 spark 目录下的 jars 目录
并设置环境变量
export SPARK_CLASSPATH = /usr/lib/spark/jars
编程模板
如何操作数据库,不同的版本方法不同,网上的教程五花八门,往往尝试不成功。
其实我们可以看 spark 自带的样例, 路径为 /usr/lib/spark/examples/src/main/python/sql 【编码时,sparkSession 需要声明 spark jars 的驱动路径,代码调用 API JDBC To Other Databases】
我从 datasource.py 中找到了基本的读写方法,其他自己可以看看
def jdbc_dataset_example(spark): # $example on:jdbc_dataset$ # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying dataframe column data types on read jdbcDF3 = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .option("customSchema", "id DECIMAL(38, 0), name STRING") .load()
# Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$
实战案例
仅供参考,请确保 spark 能连接上数据库
from pyspark.sql import SparkSession import os # 获取 环境变量 SPARK_CLASSPATH, 当然需要你事先设定了 该变量 # 如果没有设定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/* sparkClassPath = os.getenv('SPARK_CLASSPATH', '/usr/lib/spark/jars/*') ### 创建 sparkSession # spark.driver.extraClassPath 设定了 jdbc 驱动的路径 spark = SparkSession .builder .appName("Python Spark SQL basic example") .master("local") .config("spark.driver.extraClassPath", sparkClassPath) .getOrCreate() ### 连接数据库并读取表 # airDF 已经是个 DataFrame airDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres") .option("driver", "org.postgresql.Driver") .option("dbtable", "road_point002") .option("user", "postgres") .option("password", "postgres") .load() ### 打印schema airDF.printSchema() # df 的表结构,我们看到的就是 列名即格式等 ### 只打印前20条 -- dsl 方式 airDF.select('id', 'road_number', 'speed_t').show() # id, road_number, speed_t 列名 ### 把 df 转成 table -- sql 方式 def func(x): print(x) airDF.registerTempTable('pg') spark.sql("select * from pg limit 20").foreach(func) ### 存储为 RDBMS、xml、json等格式 ## 存到数据库 airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" , table = "test",mode="append", properties={"user": "postgres", "password": "postgres"}) # 写入数据库 ## 存为 json airDF.write.format('json').save('jsoin_path') # 存入分区文件 airDF.coalesce(1).write.format('json').save('filtered.json') # 存入单个文件,不建议使用