zoukankan      html  css  js  c++  java
  • spark jdbc读取并发度优化

    很多人在spark中使用默认提供的jdbc方法时,在数据库数据较大时经常发现任务 hang 住,其实是单线程任务过重导致,这时候需要提高读取的并发度。 
    下文以 mysql 为例进行说明。

    在spark中使用jdbc

    在 spark-env.sh 文件中加入:

    export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar

    任务提交时加入:

    --jars /path/mysql-connector-java-5.1.34.jar

    1. 单partition(无并发)

    调用函数

    def jdbc(url: String, table: String, properties: Properties): DataFrame

    使用:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)
    
    // 一些操作
    ....

    查看并发度

    jdbcDF.rdd.partitions.size # 结果返回 1

    该操作的并发度为1,你所有的数据都会在一个partition中进行操作,意味着无论你给的资源有多少,只有一个task会执行任务,执行效率可想而之,并且在稍微大点的表中进行操作分分钟就会OOM。

    更直观的说法是,达到千万级别的表就不要使用该操作,count操作就要等一万年,no zuo no die ,don’t to try !

    WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 56, spark047219):
     java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

    2. 根据Long类型字段分区

    调用函数

      def jdbc(
      url: String,
      table: String,
      columnName: String,    # 根据该字段分区,需要为整形,比如id等
      lowerBound: Long,      # 分区的下界
      upperBound: Long,      # 分区的上界
      numPartitions: Int,    # 分区的个数
      connectionProperties: Properties): DataFrame

    使用:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    val columnName = "colName"
    val lowerBound = 1,
    val upperBound = 10000000,
    val numPartitions = 10,
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    // 取得该表数据
    val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)
    
    // 一些操作
    ....

    查看并发度

    jdbcDF.rdd.partitions.size # 结果返回 10

    该操作将字段 colName 中1-10000000条数据分到10个partition中,使用很方便,缺点也很明显,只能使用整形数据字段作为分区关键字。

    3000w数据的表 count 跨集群操作只要2s。

    3. 根据任意类型字段分区

    调用函数

    jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame

    下面以使用最多的时间字段分区为例:

    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    
    /**
    * 将9月16-12月15三个月的数据取出,按时间分为6个partition
    * 为了减少事例代码,这里的时间都是写死的
    * modified_time 为时间字段
    */
    
    
    val predicates =
        Array(
          "2015-09-16" -> "2015-09-30",
          "2015-10-01" -> "2015-10-15",
          "2015-10-16" -> "2015-10-31",
          "2015-11-01" -> "2015-11-14",
          "2015-11-15" -> "2015-11-30",
          "2015-12-01" -> "2015-12-15"
        ).map {
          case (start, end) =>
            s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
        }
    
    // 取得该表数据
    val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)
    
    // 一些操作
    ....

    查看并发度

    jdbcDF.rdd.partitions.size # 结果返回 6

    该操作的每个分区数据都由该段时间的分区组成,这种方式适合各种场景,较为推荐。

    结语
    以 mysql 3000W 数据量表为例,单分区count,僵死若干分钟报OOM。

    分成5-20个分区后,count 操作只需要 2s

    高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的partition同时读取)也可能会将数据源数据库弄挂。

  • 相关阅读:
    5.Hiveguigun滚(ノ`Д)ノ竟然竞争谨慎谨慎谨慎哈喇子罢工八公
    4.HadoopMapRe程序设计
    3.MapReduce原理和Yarn
    java注解
    各种操作系统远程windows服务器
    protocol-buffers
    反向生成实体类
    java 反射
    web api 返回数据
    Newtonsoft.Json
  • 原文地址:https://www.cnblogs.com/itboys/p/12881976.html
Copyright © 2011-2022 走看看