zoukankan      html  css  js  c++  java
  • Spark JDBC To MySQL

    mysql jdbc driver下载地址
    https://dev.mysql.com/downloads/connector/j/

    在spark中使用jdbc
    1.在 spark-env.sh 文件中加入:
    export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.42.jar
    2.任务提交时加入:
    --jars /path/mysql-connector-java-5.1.42.jar


    从Spark Shell连接到MySQL:
    spark-shell --jars "/path/mysql-connector-java-5.1.42.jar

    可以使用Data Sources API将来自远程数据库的表作为DataFrame或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。

    可以使用Data Sources API将来自远程数据库的表作为DataFrame或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。 user并且password通常作为用于登录数据源的连接属性提供。除了连接属性外,Spark还支持以下不区分大小写的选项:

    JDBC connection properties
    属性名称和含义
    url:要连接的JDBC URL。列如:jdbc:mysql://ip:3306
    dbtable:应该读取的JDBC表。可以使用括号中的子查询代替完整表。
    driver:用于连接到此URL的JDBC驱动程序的类名,列如:com.mysql.jdbc.Driver

    partitionColumn, lowerBound, upperBound, numPartitions
    这些options仅适用于read数据。这些options必须同时被指定。他们描述,如何从多个workers并行读取数据时,分割表。
    partitionColumn:必须是表中的数字列。
    lowerBound和upperBound仅用于决定分区的大小,而不是用于过滤表中的行。
    表中的所有行将被分割并返回。

    fetchsize:仅适用于read数据。JDBC提取大小,用于确定每次获取的行数。这可以帮助JDBC驱动程序调优性能,这些驱动程序默认具有较低的提取大小(例如,Oracle每次提取10行)。

    batchsize:仅适用于write数据。JDBC批量大小,用于确定每次insert的行数。
    这可以帮助JDBC驱动程序调优性能。默认为1000。

    isolationLevel:仅适用于write数据。事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。请参阅文档java.sql.Connection。

    truncate:仅适用于write数据。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除,再重建其现有的表。这可以更有效,并且防止表元数据(例如,索引)被去除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为false。

    createTableOptions:仅适用于write数据。此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项。



    spark jdbc read MySQL

    val jdbcDF11 = spark.read.format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("url", "jdbc:mysql://ip:3306")
          .option("dbtable", "db.user_test")
          .option("user", "test")
          .option("password", "123456")
          .option("fetchsize", "3")
          .load()
    jdbcDF11.show
    
    val jdbcDF12 = spark.read.format("jdbc").options(
          Map(
            "driver" -> "com.mysql.jdbc.Driver",
            "url" -> "jdbc:mysql://ip:3306",
            "dbtable" -> "db.user_test",
            "user" -> "test",
            "password" -> "123456",
            "fetchsize" -> "3")).load()
    jdbcDF12.show
    

    jdbc(url: String, table: String, properties: Properties): DataFrame

    //-----------------------------------
    
    import java.util.Properties
    
    // jdbc(url: String, table: String, properties: Properties): DataFrame
    
    val readConnProperties1 = new Properties()
    readConnProperties1.put("driver", "com.mysql.jdbc.Driver")
    readConnProperties1.put("user", "test")
    readConnProperties1.put("password", "123456")
    readConnProperties1.put("fetchsize", "3")
    
    val jdbcDF1 = spark.read.jdbc(
      "jdbc:mysql://ip:3306",
      "db.user_test",
      readConnProperties1)
    
    jdbcDF1.show
    +---+------+---+
    |uid|gender|age|
    +---+------+---+
    |  2|     2| 20|
    |  3|     1| 30|
    |  4|     2| 40|
    |  5|     1| 50|
    |  6|     2| 60|
    |  7|     1| 25|
    |  8|     2| 35|
    |  9|     1| 70|
    | 10|     2| 80|
    |  1|     1| 18|
    +---+------+---+
    
    
    //默认并行度为1
    jdbcDF1.rdd.partitions.size
    Int = 1
    
    //-------------------------
        
    // jdbc(url: String, table: String, properties: Properties): DataFrame
    
    val readConnProperties4 = new Properties()
    readConnProperties4.put("driver", "com.mysql.jdbc.Driver")
    readConnProperties4.put("user", "test")
    readConnProperties4.put("password", "123456")
    readConnProperties4.put("fetchsize", "3")
    
    
    val jdbcDF4 = spark.read.jdbc(
      "jdbc:mysql://ip:3306",
      "(select * from db.user_test where gender=1) t",  // 注意括号和表别名,必须得有,这里可以过滤数据
      readConnProperties4)
      
    jdbcDF4.show
    +---+------+---+
    |uid|gender|age|
    +---+------+---+
    |  3|     1| 30|
    |  5|     1| 50|
    |  7|     1| 25|
    |  9|     1| 70|
    |  1|     1| 18|
    +---+------+---+
    

    jdbc(url: String, table: String,
         columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int,
         connectionProperties: Properties): DataFrame

    	 
    import java.util.Properties
    
    val readConnProperties2 = new Properties()
    readConnProperties2.put("driver", "com.mysql.jdbc.Driver")
    readConnProperties2.put("user", "test")
    readConnProperties2.put("password", "123456")
    readConnProperties2.put("fetchsize", "2")
    
    val columnName = "uid"
    val lowerBound = 1
    val upperBound = 6
    val numPartitions = 3
    
    val jdbcDF2 = spark.read.jdbc(
      "jdbc:mysql://ip:3306",
      "db.user_test",
      columnName,
      lowerBound,
      upperBound,
      numPartitions,
      readConnProperties2)
    
    jdbcDF2.show
    +---+------+---+
    |uid|gender|age|
    +---+------+---+
    |  2|     2| 20|
    |  1|     1| 18|
    |  3|     1| 30|
    |  4|     2| 40|
    |  5|     1| 50|
    |  6|     2| 60|
    |  7|     1| 25|
    |  8|     2| 35|
    |  9|     1| 70|
    | 10|     2| 80|
    +---+------+---+
    
    // 并行度为3,对应于numPartitions
    jdbcDF2.rdd.partitions.size
    Int = 3
    

    jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
    predicates: Condition in the WHERE clause for each partition.

    import java.util.Properties
    
    val readConnProperties3 = new Properties()
    readConnProperties3.put("driver", "com.mysql.jdbc.Driver")
    readConnProperties3.put("user", "test")
    readConnProperties3.put("password", "123456")
    readConnProperties3.put("fetchsize", "2")
    
    val arr = Array(
      (1, 50),
      (2, 60))
    
    // 此处的条件,既可以分割数据用作并行度,也可以过滤数据
    val predicates = arr.map {
      case (gender, age) =>
        s" gender = $gender " + s" AND age < $age "
    }
    
    val predicates1 =
      Array(
        "2017-05-01" -> "2017-05-20",
        "2017-06-01" -> "2017-06-05").map {
          case (start, end) =>
            s"cast(create_time as date) >= date '$start' " + s"AND cast(create_time as date) <= date '$end'"
        }
    
    val jdbcDF3 = spark.read.jdbc(
      "jdbc:mysql://ip:3306",
      "db.user_test",
      predicates,
      readConnProperties3)
    
    
    
    jdbcDF3.show
    +---+------+---+
    |uid|gender|age|
    +---+------+---+
    |  3|     1| 30|
    |  7|     1| 25|
    |  1|     1| 18|
    |  2|     2| 20|
    |  4|     2| 40|
    |  8|     2| 35|
    +---+------+---+
    
    // 并行度为2,对应arr数组中元素的个数
    jdbcDF3.rdd.partitions.size
    Int = 2
    

    spark jdbc write MySQL

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    
    val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
      (0, "male", 37, 10, "no", 3, 18, 7, 4),
      (0, "female", 27, 4, "no", 4, 14, 6, 4),
      (0, "female", 32, 15, "yes", 1, 12, 1, 4),
      (0, "male", 57, 15, "yes", 5, 18, 6, 5),
      (0, "male", 22, 0.75, "no", 2, 17, 6, 3),
      (0, "female", 32, 1.5, "no", 2, 17, 5, 5),
      (0, "female", 22, 0.75, "no", 2, 12, 1, 3),
      (0, "male", 57, 15, "yes", 2, 14, 4, 4),
      (0, "female", 32, 15, "yes", 4, 16, 1, 2))
    
    val colArray: Array[String] = Array("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
    
    val df = dataList.toDF(colArray: _*)
    
    df.write.mode("overwrite").format("jdbc").options(
      Map(
        "driver" -> "com.mysql.jdbc.Driver",
        "url" -> "jdbc:mysql://ip:3306",
        "dbtable" -> "db.affairs",
        "user" -> "test",
        "password" -> "123456",
        "batchsize" -> "1000",
        "truncate" -> "true")).save()
    
  • 相关阅读:
    SQL0668N Operation not allowed for reason code "3" on table "TEST". SQLSTATE=57016
    为何存在requests库,pycharm依然报错解决方法 --转载
    vmware12启动centos6.8报错ACPI:memory_hp:Memory online failed
    deepin升级之后打不开控制中心
    ubuntu中接一个摄像头会出现两个/dev/video
    VMware16中Ubuntu不显示共享文件夹的解决办法
    opencv获取当前帧数据问题
    libusb函数
    设置ubuntu、deppin(等linux系统)和window双系统启动引导顺序
    window和ubuntu双系统删除"ubuntu"
  • 原文地址:https://www.cnblogs.com/wwxbi/p/6978774.html
Copyright © 2011-2022 走看看