zoukankan      html  css  js  c++  java
  • Spark jdbc postgresql数据库连接和写入操作源码解读

    概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行。整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中。附带完整项目源码(完整项目源码github)。

    这里写图片描述

    1.首先在postgreSQL中创建一张测试表,并插入数据。(完整项目源码Github)

    1.1. 在postgreSQL中的postgres用户下,创建 products

    CREATE TABLE products (
        product_no integer,
        name text,
        price numeric
    );

    1.2. 在 products 插入数据

    INSERT INTO products (product_no, name, price) VALUES
        (1, 'Cheese', 9.99),
        (2, 'Bread', 1.99),
        (3, 'Milk', 2.99);

    查看数据库写入结果。

    这里写图片描述

    2.编写SPARK程序。(完整项目源码Github

    2.1.读取Postgresql某一张表的数据为DataFrame(完整项目源码Github

    SparkPostgresqlJdbc.java
    Properties connectionProperties = new Properties();
    
    
    //增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
    connectionProperties.put("user","postgres");
    connectionProperties.put("password","123456");
    connectionProperties.put("driver","org.postgresql.Driver");
    
    //SparkJdbc读取Postgresql的products表内容
    Dataset<Row> jdbcDF = spark.read()
            .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
    
    //显示jdbcDF数据内容
    jdbcDF.show();

    2.2.写入Postgresql某张表中

    //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
    jdbcDF.write().mode("append")
            .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

    3.运行程序,并查看结果(如果在IDEA中开发不熟练,可以看我另一篇博文spark (java API) 在Intellij IDEA中开发并运行)。

    3.1.直接在intellij IDEA(社区版)中运行。

    a.在运行按钮的“Edit Configeration”中的VM option中添加“-Dspark.master=local”

    这里写图片描述

    3.2.在终端(Terminal)中运行。

    /opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit 
      --class "SparkPostgresqlJdbc" 
      --master local[4] 
      --driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar 
      target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar

    其中 --driver-class-path 指定下载的postgresql JDBC数据
    库驱动路径,命令执行要在项目的根目录中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。

    这里写图片描述

    查看Spark写入数据库中的数据

    这里写图片描述

    4.以下为项目中主要源码(完整项目源码Github):

    4.1.项目配置源码pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>wangxiaolei</groupId>
        <artifactId>SparkPostgresqlJdbc</artifactId>
        <version>1.0-SNAPSHOT</version>
        <dependencies>
            <dependency> <!-- Spark dependency -->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>9.4.1212</version>
            </dependency>
        </dependencies>
    </project>

    4.2.java源码SparkPostgresqlJdbc.java

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.Properties;
    
    /**
     * MIT.
     * Author: wangxiaolei(王小雷).
     * Date:17-2-9.
     * Project:SparkPostgresqlJdbc.
     */
    public class SparkPostgresqlJdbc {
        public static void main (String[] args) {
    
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkPostgresqlJdbc")
                    .config("spark.some.config.option","some-value")
                    .getOrCreate();
        //启动runSparkPostgresqlJdbc程序
            runSparkPostgresqlJdbc(spark);
    
            spark.stop();
    
        }
    
        private static void runSparkPostgresqlJdbc(SparkSession spark){
            //new一个属性
            System.out.println("确保数据库已经开启,并创建了products表和插入了数据");
            Properties connectionProperties = new Properties();
    
    
            //增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
            System.out.println("增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)");
            connectionProperties.put("user","postgres");
            connectionProperties.put("password","123456");
            connectionProperties.put("driver","org.postgresql.Driver");
    
    
    
            //SparkJdbc读取Postgresql的products表内容
            System.out.println("SparkJdbc读取Postgresql的products表内容");
            Dataset<Row> jdbcDF = spark.read()
                    .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
            //显示jdbcDF数据内容
            jdbcDF.show();
    
    
    
            //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
            jdbcDF.write().mode("append")
                    .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);
    
        }
    }

    完整项目源码Github

  • 相关阅读:
    同步、异步、阻塞、非阻塞
    prolog 阶段总结
    prolog 规则
    prolog 内部谓词
    prolog --寻找neni (2)
    寻找 nani (1)
    一、prolog简介
    [ 转 ] scrapy 中解决 xpath 中的中文编码问题
    【转载】字符串编码问题
    二叉树 —— 创建 + 先序、中序、后序遍历(递归+非递归)
  • 原文地址:https://www.cnblogs.com/lanzhi/p/6467643.html
Copyright © 2011-2022 走看看