zoukankan      html  css  js  c++  java
  • Spark JDBC系列--取数的四种方式



    一、二、三、四标题原文地址:

    简书:wuli_小博:Spark JDBC系列–取数的四种方式



    一、单分区模式

    函数:

    def jdbc(url: String, table: String, properties: Properties): DataFrame
    

    使用示例:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)
    
    // 一些操作
    ....
    

    从入参可以看出,只需要传入JDBC URL、表名及对应的账号密码Properties即可。但是计算此DF的分区数后发现,这种不负责任的写法,并发数是1

    jdbcDF.rdd.partitions.size=1
    

    操作大数据集时,spark对MySQL的查询语句等同于可怕的:select * from table; ,而单个分区会把数据都集中在一个executor,当遇到较大数据集时,都会产生不合理的资源占用:MySQL可能hang住,spark可能会OOM,所以不推荐生产环境使用;

    二、指定Long型column字段的分区模式

    函数:

    def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame
    

    使用id做分片字段的示例:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    val columnName = "id"
    val lowerBound = getMinId()
    val upperBound = getMaxId()
    val numPartitions = 200
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = sqlContext.read.jdbc(url,tableName, columnName, lowerBound, upperBound,numPartitions,prop)
    
    // 一些操作
    ....
    

    从入参可以看出,通过指定 id 这个数字型的column作为分片键,并设置最大最小值和指定的分区数,可以对数据库的数据进行并发读取。是不是numPartitions传入多少,分区数就一定是多少呢?其实不然,通过对源码的分析可知:

    if upperBound-lowerBound >= numPartitions:
        jdbcDF.rdd.partitions.size = numPartitions
    else
        jdbcDF.rdd.partitions.size = upperBound-lowerBound
    

    拉取数据时,spark会按numPartitions均分最大最小ID,然后进行并发查询,并最终转换成RDD,例如:

    入参为:
    lowerBound=1, upperBound=1000, numPartitions=10
    
    对应查询语句组为:
    JDBCPartition(id < 101 or id is null,0), 
    JDBCPartition(id >= 101 AND id < 201,1), 
    JDBCPartition(id >= 201 AND id < 301,2), 
    JDBCPartition(id >= 301 AND id < 401,3), 
    JDBCPartition(id >= 401 AND id < 501,4), 
    JDBCPartition(id >= 501 AND id < 601,5), 
    JDBCPartition(id >= 601 AND id < 701,6), 
    JDBCPartition(id >= 701 AND id < 801,7), 
    JDBCPartition(id >= 801 AND id < 901,8), 
    JDBCPartition(id >= 901,9)
    

    建议在使用此方式进行分片时,需要评估好 numPartitions 的个数,防止单片数据过大;同时需要column字段的索引建立情况,防止查询语句出现慢SQL影响取数效率。
    如果column的数字是离散型的,为了防止拉取时出现过多空分区,以及不必要的一些数据倾斜,需要使用特殊手段进行处理,具体可以参考Spark JDBC系列–读取优化。

    三、高自由度的分区模式

    函数:

    def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame
    

    使用给定分区数组的示例:

      /**
       * 将近90天的数据进行分区读取
       * 每一天作为一个分区,例如
       * Array(
       * "2015-09-17" -> "2015-09-18",
       * "2015-09-18" -> "2015-09-19",
       * ...)
       **/
       def getPredicates = {
        
        val cal = Calendar.getInstance()
        cal.add(Calendar.DATE, -90)
        val array = ArrayBuffer[(String,String)]()
        for (i <- 0 until 90) {
          val start = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
          cal.add(Calendar.DATE, +1)
          val end = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
          array += start -> end
        }
        val predicates = array.map {
          case (start, end) => s"gmt_create >= '$start' AND gmt_create < '$end'"
        }
        
        predicates.toArray
        }
        
        val predicates = getPredicates
        //链接操作
        ...
    

    从函数可以看出,分区数组是多个并行的自定义where语句,且分区数为数据size:

    jdbcDF.rdd.partitions.size = predicates.size
    

    建议在使用此方式进行分片时,需要评估好 predicates.size 的个数,防止防止单片数据过大;同时需要自定义where语句的查询效率,防止查询语句出现慢SQL影响取数效率。

    四、自定义option参数模式

    函数示例:

    val jdbcDF = sparkSession.sqlContext.read.format("jdbc")
      .option("url", url)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "table")
      .option("user", "user")
      .option("partitionColumn", "id")
      .option("lowerBound", 1)
      .option("upperBound", 10000)
      .option("fetchsize", 100)
      .option("xxx", "xxx")
      .load()
    

    从函数可以看出,option模式其实是一种开放接口,spark会根据具体的参数,来决定使用上述三种方式中的某一种。

    在这里插入图片描述

    五、JDBC To Other Databases

    Spark官方API文档:
    JDBC To Other Databases

    5.1Scala

    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    
    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    
    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    

    5.2Java

    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    Dataset<Row> jdbcDF = spark.read()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load();
    
    Properties connectionProperties = new Properties();
    connectionProperties.put("user", "username");
    connectionProperties.put("password", "password");
    Dataset<Row> jdbcDF2 = spark.read()
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
    
    // Saving data to a JDBC source
    jdbcDF.write()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save();
    
    jdbcDF2.write()
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
    
    // Specifying create table column data types on write
    jdbcDF.write()
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
    

    5.3Python

    # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    # Loading data from a JDBC source
    jdbcDF = spark.read 
        .format("jdbc") 
        .option("url", "jdbc:postgresql:dbserver") 
        .option("dbtable", "schema.tablename") 
        .option("user", "username") 
        .option("password", "password") 
        .load()
    
    jdbcDF2 = spark.read 
        .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})
    
    # Specifying dataframe column data types on read
    jdbcDF3 = spark.read 
        .format("jdbc") 
        .option("url", "jdbc:postgresql:dbserver") 
        .option("dbtable", "schema.tablename") 
        .option("user", "username") 
        .option("password", "password") 
        .option("customSchema", "id DECIMAL(38, 0), name STRING") 
        .load()
    
    # Saving data to a JDBC source
    jdbcDF.write 
        .format("jdbc") 
        .option("url", "jdbc:postgresql:dbserver") 
        .option("dbtable", "schema.tablename") 
        .option("user", "username") 
        .option("password", "password") 
        .save()
    
    jdbcDF2.write 
        .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})
    
    # Specifying create table column data types on write
    jdbcDF.write 
        .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") 
        .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})
    
  • 相关阅读:
    unexpected inconsistency;run fsck manually esxi断电后虚拟机启动故障
    centos 安装mysql 5.7
    centos 7 卸载mysql
    centos7 在线安装mysql5.6,客户端远程连接mysql
    ubuntu 14.04配置ip和dns
    centos7 上搭建mqtt服务
    windows eclipse IDE打开当前类所在文件路径
    git 在非空文件夹clone新项目
    eclipse中java build path下 allow output folders for source folders 无法勾选,该如何解决 eclipse中java build path下 allow output folders for source folders 无法勾选,
    Eclipse Kepler中配置JadClipse
  • 原文地址:https://www.cnblogs.com/aixing/p/13327277.html
Copyright © 2011-2022 走看看