zoukankan      html  css  js  c++  java
  • Spark jdbc postgresql数据库连接和写入操作源代码解读

    概述:Spark postgresql jdbc 数据库连接和写入操作源代码解读。具体记录了SparkSQL对数据库的操作,通过java程序。在本地开发和执行。总体为,Spark建立数据库连接,读取数据。将DataFrame数据写入还有一个数据库表中。附带完整项目源代码(完整项目源代码github)。

    这里写图片描写叙述

    1.首先在postgreSQL中创建一张測试表,并插入数据。(完整项目源代码Github)

    1.1. 在postgreSQL中的postgres用户下,创建 products

    CREATE TABLE products (
        product_no integer,
        name text,
        price numeric
    );

    1.2. 在 products 插入数据

    INSERT INTO products (product_no, name, price) VALUES
        (1, 'Cheese', 9.99),
        (2, 'Bread', 1.99),
        (3, 'Milk', 2.99);

    查看数据库写入结果。

    这里写图片描写叙述

    2.编写SPARK程序。(完整项目源代码Github

    2.1.读取Postgresql某一张表的数据为DataFrame(完整项目源代码Github

    SparkPostgresqlJdbc.java
    Properties connectionProperties = new Properties();
    
    
    //添加数据库的username(user)密码(password),指定postgresql驱动(driver)
    connectionProperties.put("user","postgres");
    connectionProperties.put("password","123456");
    connectionProperties.put("driver","org.postgresql.Driver");
    
    //SparkJdbc读取Postgresql的products表内容
    Dataset<Row> jdbcDF = spark.read()
            .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
    
    //显示jdbcDF数据内容
    jdbcDF.show();

    2.2.写入Postgresql某张表中

    //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
    jdbcDF.write().mode("append")
            .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

    3.执行程序。并查看结果(假设在IDEA中开发不熟练。能够看我还有一篇博文spark (java API) 在Intellij IDEA中开发并执行)。

    3.1.直接在intellij IDEA(社区版)中执行。

    a.在执行button的“Edit Configeration”中的VM option中加入“-Dspark.master=local”

    这里写图片描写叙述

    3.2.在终端(Terminal)中执行。

    /opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit 
      --class "SparkPostgresqlJdbc" 
      --master local[4] 
      --driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar 
      target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar

    当中 --driver-class-path 指定下载的postgresql JDBC数据
    库驱动路径。命令执行要在项目的根文件夹中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。

    这里写图片描写叙述

    查看Spark写入数据库中的数据

    这里写图片描写叙述

    4.下面为项目中主要源代码(完整项目源代码Github):

    4.1.项目配置源代码pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>wangxiaolei</groupId>
        <artifactId>SparkPostgresqlJdbc</artifactId>
        <version>1.0-SNAPSHOT</version>
        <dependencies>
            <dependency> <!-- Spark dependency -->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>9.4.1212</version>
            </dependency>
        </dependencies>
    </project>

    4.2.java源代码SparkPostgresqlJdbc.java

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.Properties;
    
    /**
     * MIT.
     * Author: wangxiaolei(王小雷).
     * Date:17-2-9.
     * Project:SparkPostgresqlJdbc.
     */
    public class SparkPostgresqlJdbc {
        public static void main (String[] args) {
    
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkPostgresqlJdbc")
                    .config("spark.some.config.option","some-value")
                    .getOrCreate();
        //启动runSparkPostgresqlJdbc程序
            runSparkPostgresqlJdbc(spark);
    
            spark.stop();
    
        }
    
        private static void runSparkPostgresqlJdbc(SparkSession spark){
            //new一个属性
            System.out.println("确保数据库已经开启,并创建了products表和插入了数据");
            Properties connectionProperties = new Properties();
    
    
            //添加数据库的username(user)密码(password),指定postgresql驱动(driver)
            System.out.println("添加数据库的username(user)密码(password),指定postgresql驱动(driver)");
            connectionProperties.put("user","postgres");
            connectionProperties.put("password","123456");
            connectionProperties.put("driver","org.postgresql.Driver");
    
    
    
            //SparkJdbc读取Postgresql的products表内容
            System.out.println("SparkJdbc读取Postgresql的products表内容");
            Dataset<Row> jdbcDF = spark.read()
                    .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
            //显示jdbcDF数据内容
            jdbcDF.show();
    
    
    
            //将jdbcDF数据新建并写入newproducts,append模式是连接模式。默认的是"error"模式。

    jdbcDF.write().mode("append") .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties); } }

    完整项目源代码Github

  • 相关阅读:
    Android服务之bindService源代码分析
    [iOS 高级] iOS远程推送与本地推送大致流程
    redis集群
    面试你之前,我希望在简历上看到这些!
    解决安卓出现导入第三方包反复的错误
    【PA2012】【BZOJ4289】Tax
    Spark jdbc postgresql数据库连接和写入操作源代码解读
    Java中hashcode的理解
    CvArr、Mat、CvMat、IplImage、BYTE转换
    CSDN日报20170413 ——《天天写业务代码的那些年,我们是怎样成长过来的》
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8677027.html
Copyright © 2011-2022 走看看