zoukankan      html  css  js  c++  java
  • Spark踩坑记——数据库(Hbase+Mysql)

    前言

    在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值。最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录。

    Spark Streaming持久化设计模式

    DStreams输出操作

    • print:打印driver结点上每个Dstream中的前10个batch元素,常用于开发和调试
    • saveAsTextFiles(prefix, [suffix]):将当前Dstream保存为文件,每个interval batch的文件名命名规则基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
    • saveAsObjectFiles(prefix, [suffix]):将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
    • saveAsHadoopFiles(prefix, [suffix]):将Dstream以hadoop文件的形式进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
    • foreachRDD(func):最通用的输出操作,可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统,如:将RDD保存到文件,或者通过网络连接保存到数据库。值得注意的是:_fun_执行在跑应用的driver进程中,并且通常会包含RDD action以促使数据流RDD开始计算。

    使用foreachRDD的设计模式

    dstream.foreachRDD对于开发而言提供了很大的灵活性,但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是:建立远程连接->通过连接传输数据到远程系统->关闭连接。针对这个流程我们很直接的想到了下面的程序代码:

    dstream.foreachRDD { rdd =>
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.send(record) // executed at the worker
      }
    }
    

    spark踩坑记——初试中,对spark的worker和driver进行了整理,我们知道在集群模式下,上述代码中的connection需要通过序列化对象的形式从driver发送到worker,但是connection是无法在机器之间传递的,即connection是无法序列化的,这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误,我们将conenction在worker当中建立,代码如下:

    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }
    

    似乎这样问题解决了?但是细想下,我们在每个rdd的每条记录当中都进行了connection的建立和关闭,这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注:每个partition是内的rdd是运行在同一worker之上的),代码如下:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createNewConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        connection.close()
      }
    }
    

    这样我们降低了频繁建立连接的负载,通常我们在连接数据库时会使用连接池,把连接池的概念引入,代码优化如下:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    通过持有一个静态连接池对象,我们可以重复利用connection而进一步优化了连接建立的开销,从而降低了负载。另外值得注意的是,同数据库的连接池类似,我们这里所说的连接池同样应该是lazy的按需建立连接,并且及时的收回超时的连接。
    另外值得注意的是:

    • 如果在spark streaming中使用了多次foreachRDD,它们之间是按照程序顺序向下执行的
    • Dstream对于输出操作的执行策略是lazy的,所以如果我们在foreachRDD中不添加任何RDD action,那么系统仅仅会接收数据然后将数据丢弃。

    Spark访问Hbase

    上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式,这里我们阐述如何将Dstream输出到Hbase集群。

    Hbase通用连接类

    Scala连接Hbase是通过zookeeper获取信息,所以在配置时需要提供zookeeper的相关信息,如下:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Connection
    import org.apache.hadoop.hbase.HConstants
    import org.apache.hadoop.hbase.client.ConnectionFactory
    
    object HbaseUtil extends Serializable {
      private val conf = HBaseConfiguration.create()
      private val para = Conf.hbaseConfig // Conf为配置类,获取hbase的配置
      conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
      conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1"))  // hosts
      private val connection = ConnectionFactory.createConnection(conf)
    
      def getHbaseConn: Connection = connection
    }
    

    根据网上资料,Hbase的连接的特殊性我们并没有使用连接池

    Hbase输出操作

    我们以put操作为例,演示将上述设计模式应用到Hbase输出操作当中:

    dstream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreachPartition(partitionRecords => {
            val connection = HbaseUtil.getHbaseConn // 获取Hbase连接
            partitionRecords.foreach(data => {
                val tableName = TableName.valueOf("tableName")
                val t = connection.getTable(tableName)
                try {
                  val put = new Put(Bytes.toBytes(_rowKey_)) // row key
                  // column, qualifier, value
                  put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes)
                  Try(t.put(put)).getOrElse(t.close())
                  // do some log(显示在worker上)
                } catch {
                  case e: Exception =>
                    // log error
                    e.printStackTrace()
                } finally {
                  t.close()
                }
          })
        })
        // do some log(显示在driver上)
      }
    })
    

    关于Hbase的其他操作可以参考Spark 下操作 HBase(1.0.0 新 API)

    填坑记录

    重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:

    • 由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1(任意),我们在hosts中需要配置
    127-0-0-1 127.0.0.1
    
    • 在单机情况下,我们只需要配置一台zookeeper所在Hbase的hosts即可,但是当切换到Hbase集群是遇到一个诡异的bug
      问题描述:在foreachRDD中将Dstream保存到Hbase时会卡住,并且没有任何错误信息爆出(没错!它就是卡住,没反应)
      问题分析:由于Hbase集群有多台机器,而我们只配置了一台Hbase机器的hosts,这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里
      解决方式:对每个worker上的hosts配置了所有hbase的节点ip,问题解决

    Spark访问Mysql

    同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池

    MySQL通用连接类

    import java.sql.Connection
    import java.util.Properties
    
    import com.mchange.v2.c3p0.ComboPooledDataSource
    
    class MysqlPool extends Serializable {
      private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
      private val conf = Conf.mysqlConfig
      try {
        cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
        cpds.setDriverClass("com.mysql.jdbc.Driver");
        cpds.setUser(conf.get("username").getOrElse("root"));
        cpds.setPassword(conf.get("password").getOrElse(""))
        cpds.setMaxPoolSize(200)
        cpds.setMinPoolSize(20)
        cpds.setAcquireIncrement(5)
        cpds.setMaxStatements(180)
      } catch {
        case e: Exception => e.printStackTrace()
      }
      def getConnection: Connection = {
        try {
          return cpds.getConnection();
        } catch {
          case ex: Exception =>
            ex.printStackTrace()
            null
        }
      }
    }
    object MysqlManager {
      var mysqlManager: MysqlPool = _
      def getMysqlManager: MysqlPool = {
        synchronized {
          if (mysqlManager == null) {
            mysqlManager = new MysqlPool
          }
        }
        mysqlManager
      }
    }
    

    我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。

    Mysql输出操作

    同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:

    dstream.foreachRDD(rdd => {
        if (!rdd.isEmpty) {
          rdd.foreachPartition(partitionRecords => {
            //从连接池中获取一个连接
            val conn = MysqlManager.getMysqlManager.getConnection
            val statement = conn.createStatement
            try {
              conn.setAutoCommit(false)
              partitionRecords.foreach(record => {
                val sql = "insert into table..." // 需要执行的sql操作
                statement.addBatch(sql)
              })
              statement.executeBatch
              conn.commit
            } catch {
              case e: Exception =>
                // do some log
            } finally {
              statement.close()
              conn.close()
            }
          })
        }
    })
    

    值得注意的是:

    • 我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。
    • 如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T)

    部署

    提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置:

    <dependency><!-- Hbase -->
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.0.0</version>
    </dependency>
    
    <dependency><!-- Mysql -->
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.31</version>
    </dependency>
    <dependency>
        <groupId>c3p0</groupId>
        <artifactId>c3p0</artifactId>
        <version>0.9.1.2</version>
    </dependency>
    

    参考文献:

    1. Spark Streaming Programming Guide
    2. HBase介绍
    3. Spark 下操作 HBase(1.0.0 新 API)
    4. Spark开发快速入门
    5. kafka->spark->streaming->mysql(scala)实时数据处理示例
    6. Spark Streaming 中使用c3p0连接池操作mysql数据库
  • 相关阅读:
    13、文件修改及函数的基本使用
    12、文件处理 b模式
    作业3月16号
    作业3月13号
    11、文件处理 t模式
    10、数据类型内置之集合
    作业3月11号
    9、基础类型之列表、元组、字典
    作业3月10号
    8、for循环以及数字类型和字符串类型的内置方法
  • 原文地址:https://www.cnblogs.com/xlturing/p/spark.html
Copyright © 2011-2022 走看看