zoukankan      html  css  js  c++  java
  • Spark使用jdbc时的并行度

    Spark SQL支持数据源使用JDBC从其他数据库读取数据。 与使用JdbcRDD相比,应优先使用此功能。 这是因为结果以DataFrame的形式返回,并且可以轻松地在Spark SQL中进行处理或与其他数据源合并。 JDBC数据源也更易于从Java或Python使用,因为它不需要用户提供ClassTag。 (请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。

    首先,您需要在spark类路径上包含特定数据库的JDBC驱动程序。

    例如,要从Spark Shell连接到postgres,您可以运行以下命令:

    bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
    
    • Spark读取关系型数据库,官方有API接口,如下:
      ①、SparkSession.read.jdbc(url, table, properties)
      ②、SparkSession.read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties)
      ③、SparkSession.read.jdbc(url, table, predicates, connectionProperties)
    1. 单partition方式:使用如下函数
    def jdbc(url: String, table: String, properties: Properties): DataFrame
    

    例子:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = spark.read.jdbc(url,tableName,prop)
    
    // 一些操作
    jdbcDF.write.mode..
    

    查看并发度

    jdbcDF.rdd.partitions.size # 结果返回 1
    

    该操作的并发度为1,你所有的数据都会在一个partition中进行操作,意味着无论你给的资源有多少,只有一个task会执行任务,执行效率可想而之,并且在稍微大点的表中进行操作分分钟就会OOM

    更直观的说法是,达到千万级别的表就不要使用该操作,count操作就要等一万年,亲测4个小时 !

    1. 根据Long类型字段分区
      调用函数为
     def jdbc(
      url: String,
      table: String,
      columnName: String,    # 根据该字段分区,需要为整形,比如id等
      lowerBound: Long,      # 分区的下界
      upperBound: Long,      # 分区的上界
      numPartitions: Int,    # 分区的个数
      connectionProperties: Properties): DataFrame
    

    例子:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    val columnName = "colName"
    val lowerBound = 1,
    val upperBound = 10000000,
    val numPartitions = 10,
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = spark.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)
    
    // 一些操作
    ....
    

    查看并发度

    jdbcDF.rdd.partitions.size # 结果返回 10
    该操作将字段 colName 中1-10000000条数据分到10个partition中,使用很方便,缺点也很明显,只能使用整形数据字段作为分区关键字。
    
    1. 根据任意类型字段分区
      调用函数为
    jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame
    

    例子:

    val url = "jdbc:mysql://localhost:3306/db"
    val tableName = "tablename"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","mysql")
    prop.setProperty("password","123456")
    
    
    val predicates =
      Array(
        "2018-10-01" -> "2018-11-01",
        "2018-11-02" -> "2018-12-01",
        "2018-12-02" -> "2019-01-01",
        "2019-02-02" -> "2019-03-01",
        "2019-03-02" -> "2019-04-01",
        "2019-04-02" -> "2019-05-01",
        "2019-05-02" -> "2019-06-01",
        "2019-06-02" -> "2019-07-01",
        "2019-07-02" -> "2019-08-01",
        "2019-08-02" -> "2019-09-01",
        "2019-09-02" -> "2019-10-01",
        "2019-10-02" -> "2019-11-01"
      ).map {
        case (start, end) =>
          s"cast(txntime as date) >= date '$start' " + s"AND cast(txntime as date) <= date '$end'"
      }
    
    // 取得该表数据
    val jdbcDF = spark.read.jdbc(url, tableName, predicates, prop)
    // 写入到hive表
    jdbcDF.write.partitionBy().mode("overwrite").format("orc")
      .saveAsTable("db.tableName")
    

    一千万级别数据实测2.4min左右导入完成。

    1. limit分页分区

      依旧采用上述函数,但是partitions做了修改,例子:

    val url = "jdbc:mysql://localhost:3306/db"
    val tableName = "tablename"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","mysql")
    prop.setProperty("password","123456")
    
    def getPartition(count:Int) = {
      val step = count / 10
      Range(0, count, step).map(x =>{
        (x, step)
      }).toArray
    }
    val partitions = getPartition(10000000)
      .map {
        case (start,end) => s"1=1 limit ${start},${end}"
      }
    
    // 取得该表数据
    val jdbcDF = spark.read.jdbc(url, tableName, partitions, prop)
    // 写入到hive表
    jdbcDF.write.partitionBy().mode("overwrite").format("orc")
      .saveAsTable("db.tableName")
    

    实际测试效果和上面的差不多,区别是这里不需要字段有特殊的要求,对行数做处理就行啦。

  • 相关阅读:
    整数的二进制表示中1的个数
    最长公共子序列
    关于使浏览器崩溃的代码尝试
    wp7 独立存储
    动态引用样式表
    锋利的jQuery小记……
    DataGridVidw添加CheckBox。并通过一个 CheckBox来控制其全选。
    全选按钮的使用。winfrom程序中,对全选按钮的理解,欢迎拍砖!
    Ul li 超出文本显示省略号JS右键控制 本人QQ:267307031 空间更多技术咨询
    FileUpload控件
  • 原文地址:https://www.cnblogs.com/Kaivenblog/p/12622008.html
Copyright © 2011-2022 走看看