zoukankan      html  css  js  c++  java
  • spark教程(九)-操作数据库

    数据库也是 spark 数据源创建 df 的一种方式,因为比较重要,所以单独算一节。

    本文以 postgres 为例

    安装 JDBC

    首先需要 安装 postgres 的客户端驱动,即 JDBC 驱动,这是官方下载地址,JDBC,根据数据库版本下载对应的驱动

    上传至 spark 目录下的 jars 目录

     并设置环境变量

    export SPARK_CLASSPATH = /usr/lib/spark/jars

    编程模板

    如何操作数据库,不同的版本方法不同,网上的教程五花八门,往往尝试不成功。

    其实我们可以看 spark 自带的样例, 路径为 /usr/lib/spark/examples/src/main/python/sql    【编码时,sparkSession 需要声明 spark jars 的驱动路径,代码调用 API JDBC To Other Databases

    我从 datasource.py 中找到了基本的读写方法,其他自己可以看看

    def jdbc_dataset_example(spark):
        # $example on:jdbc_dataset$
        # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
        # Loading data from a JDBC source
        jdbcDF = spark.read 
            .format("jdbc") 
            .option("url", "jdbc:postgresql:dbserver") 
            .option("dbtable", "schema.tablename") 
            .option("user", "username") 
            .option("password", "password") 
            .load()
    
        jdbcDF2 = spark.read 
            .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
                  properties={"user": "username", "password": "password"})
    
        # Specifying dataframe column data types on read
        jdbcDF3 = spark.read 
            .format("jdbc") 
            .option("url", "jdbc:postgresql:dbserver") 
            .option("dbtable", "schema.tablename") 
            .option("user", "username") 
            .option("password", "password") 
            .option("customSchema", "id DECIMAL(38, 0), name STRING") 
            .load()
    
    # Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$

    实战案例

    仅供参考,请确保 spark 能连接上数据库

    from pyspark.sql import SparkSession
    import os
    
    # 获取 环境变量 SPARK_CLASSPATH, 当然需要你事先设定了 该变量
    # 如果没有设定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/*
    sparkClassPath = os.getenv('SPARK_CLASSPATH', '/usr/lib/spark/jars/*')
    
    ### 创建 sparkSession
    # spark.driver.extraClassPath 设定了 jdbc 驱动的路径
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL basic example") 
        .master("local") 
        .config("spark.driver.extraClassPath", sparkClassPath) 
        .getOrCreate()
    
    ### 连接数据库并读取表
    # airDF 已经是个 DataFrame
    airDF = spark.read 
        .format("jdbc") 
        .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres") 
        .option("driver", "org.postgresql.Driver") 
        .option("dbtable", "road_point002") 
        .option("user", "postgres") 
        .option("password", "postgres") 
        .load()
    
    ### 打印schema
    airDF.printSchema()     # df 的表结构,我们看到的就是 列名即格式等
    
    ### 只打印前20条 -- dsl 方式
    airDF.select('id', 'road_number', 'speed_t').show() # id, road_number, speed_t 列名
    
    ### 把 df 转成 table -- sql 方式
    def func(x):
        print(x)
    
    airDF.registerTempTable('pg')
    spark.sql("select * from pg limit 20").foreach(func)
    
    
    ### 存储为 RDBMS、xml、json等格式
    ## 存到数据库
    airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" ,
                     table = "test",mode="append", properties={"user": "postgres", "password": "postgres"})       # 写入数据库
    
    ## 存为 json
    airDF.write.format('json').save('jsoin_path')       # 存入分区文件
    airDF.coalesce(1).write.format('json').save('filtered.json')  # 存入单个文件,不建议使用
  • 相关阅读:
    拉格朗日乘数法
    线性判别分析(Linear Discriminant Analysis)
    有监督学习 和 无监督学习
    Jenkins入门知识
    vs技巧总结
    jenkins 神奇变量
    ubuntu12.04 折腾流水
    ubuntu 12.04 右上角的网络连接图标突然消失不见
    ubuntu 12.04 上网体验
    JIRA 初体验
  • 原文地址:https://www.cnblogs.com/yanshw/p/11697289.html
Copyright © 2011-2022 走看看