zoukankan      html  css  js  c++  java
  • [转]Spark 踩坑记:数据库(Hbase+Mysql)

    https://cloud.tencent.com/developer/article/1004820

    Spark 踩坑记:数据库(Hbase+Mysql)
    前言
    在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值。

    最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录。

    Spark Streaming持久化设计模式
    DStreams输出操作
    print:打印driver结点上每个Dstream中的前10个batch元素,常用于开发和调试
    saveAsTextFiles(prefix, [suffix]):将当前Dstream保存为文件,每个interval batch的文件名命名规则基于 prefix 和 suffix :”prefix-TIME_IN_MS[.suffix]”.
    saveAsObjectFiles(prefix, [suffix]):将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存,每个interval batch的文件命名规则基于prefix和suffix:: “prefix-TIME_IN_MS[.suffix]”.
    saveAsHadoopFiles(prefix, [suffix]):将Dstream以hadoop文件的形式进行保存,每个interval batch的文件命名规则基于prefix和suffix:: “prefix-TIME_IN_MS[.suffix]”.
    foreachRDD(func):最通用的输出操作,可以对从数据流中产生的每一个RDD应用函数fun。通常fun会将每个RDD中的数据保存到外部系统,如:将RDD保存到文件,或者通过网络连接保存到数据库。值得注意的是:fun执行在跑应用的driver进程中,并且通常会包含RDD action以促使数据流RDD开始计算。
    使用foreachRDD的设计模式
    dstream.foreachRDD对于开发而言提供了很大的灵活性,但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是:建立远程连接->通过连接传输数据到远程系统->关闭连接。针对这个流程我们很直接的想到了下面的程序代码:

    dstream.foreachRDD { rdd =>
    val connection = createNewConnection() // executed at the driver
    rdd.foreach { record =>
    connection.send(record) // executed at the worker
    }
    }
    在上一篇文章《spark踩坑记——初试》中,对spark的worker和driver进行了整理,我们知道在集群模式下,上述代码中的connection需要通过序列化对象的形式从driver发送到worker,但是connection是无法在机器之间传递的,即connection是无法序列化的,这样可能会引起Cserialization errors (connection object not serializable)的错误。为了避免这种错误,我们将conenction在worker当中建立,代码如下:

    dstream.foreachRDD { rdd =>
    rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
    }
    }
    似乎这样问题解决了?但是细想下,我们在每个rdd的每条记录当中都进行了connection的建立和关闭,这会导致不必要的高负荷并且降低整个系统的吞吐量。

    所以一个更好的方式是使用rdd.foreachPartition即对于每一个rdd的partition建立唯一的连接(注:每个partition是内的rdd是运行在同一worker之上的),代码如下:

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    }
    }
    这样我们降低了频繁建立连接的负载,通常我们在连接数据库时会使用连接池,把连接池的概念引入,代码优化如下:

    dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse
    }
    }
    通过持有一个静态连接池对象,我们可以重复利用connection而进一步优化了连接建立的开销,从而降低了负载。另外值得注意的是,同数据库的连接池类似,我们这里所说的连接池同样应该是lazy的按需建立连接,并且及时的收回超时的连接。

    另外值得注意的是:

    如果在spark streaming中使用了多次foreachRDD,它们之间是按照程序顺序向下执行的
    Dstream对于输出操作的执行策略是lazy的,所以如果我们在foreachRDD中不添加任何RDD action,那么系统仅仅会接收数据然后将数据丢弃。
    Spark访问Hbase
    上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式,这里我们阐述如何将Dstream输出到Hbase集群。

    Hbase通用连接类
    Scala连接Hbase是通过zookeeper获取信息,所以在配置时需要提供zookeeper的相关信息,如下:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Connection
    import org.apache.hadoop.hbase.HConstants
    import org.apache.hadoop.hbase.client.ConnectionFactory

    object HbaseUtil extends Serializable {
    private val conf = HBaseConfiguration.create()
    private val para = Conf.hbaseConfig // Conf为配置类,获取hbase的配置
    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
    conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts
    private val connection = ConnectionFactory.createConnection(conf)

    def getHbaseConn: Connection = connection
    }
    根据网上资料,Hbase的连接的特殊性我们并没有使用连接池

    Hbase输出操作
    我们以put操作为例,演示将上述设计模式应用到Hbase输出操作当中:

    dstream.foreachRDD(rdd => {
    if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
    val connection = HbaseUtil.getHbaseConn // 获取Hbase连接
    partitionRecords.foreach(data => {
    val tableName = TableName.valueOf("tableName")
    val t = connection.getTable(tableName)
    try {
    val put = new Put(Bytes.toBytes(rowKey)) // row key
    // column, qualifier, value
    put.addColumn(column.getBytes, qualifier.getBytes, value.getBytes)
    Try(t.put(put)).getOrElse(t.close())
    // do some log(显示在worker上)
    } catch {
    case e: Exception =>
    // log error
    e.printStackTrace()
    } finally {
    t.close()
    }
    })
    })
    // do some log(显示在driver上)
    }
    })
    关于Hbase的其他操作可以参考Spark 下操作 HBase(1.0.0 新 API)

    填坑记录
    重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:

    由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1(任意),我们在hosts中需要配置
    127-0-0-1 127.0.0.1
    在单机情况下,我们只需要配置一台zookeeper所在Hbase的hosts即可,但是当切换到Hbase集群是遇到一个诡异的bug
    问题描述:在foreachRDD中将Dstream保存到Hbase时会卡住,并且没有任何错误信息爆出(没错!它就是卡住,没反应)

    问题分析:由于Hbase集群有多台机器,而我们只配置了一台Hbase机器的hosts,这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里

    解决方式:对每个worker上的hosts配置了所有hbase的节点ip,问题解决

    Spark访问Mysql
    同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池

    MySQL通用连接类
    import java.sql.Connection
    import java.util.Properties

    import com.mchange.v2.c3p0.ComboPooledDataSource

    class MysqlPool extends Serializable {
    private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
    private val conf = Conf.mysqlConfig
    try {
    cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
    cpds.setDriverClass("com.mysql.jdbc.Driver");
    cpds.setUser(conf.get("username").getOrElse("root"));
    cpds.setPassword(conf.get("password").getOrElse(""))
    cpds.setMaxPoolSize(200)
    cpds.setMinPoolSize(20)
    cpds.setAcquireIncrement(5)
    cpds.setMaxStatements(180)
    } catch {
    case e: Exception => e.printStackTrace()
    }
    def getConnection: Connection = {
    try {
    return cpds.getConnection();
    } catch {
    case ex: Exception =>
    ex.printStackTrace()
    null
    }
    }
    }
    object MysqlManager {
    var mysqlManager: MysqlPool = _
    def getMysqlManager: MysqlPool = {
    synchronized {
    if (mysqlManager == null) {
    mysqlManager = new MysqlPool
    }
    }
    mysqlManager
    }
    }
    我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。

    Mysql输出操作
    同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:

    dstream.foreachRDD(rdd => {
    if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
    //从连接池中获取一个连接
    val conn = MysqlManager.getMysqlManager.getConnection
    val statement = conn.createStatement
    try {
    conn.setAutoCommit(false)
    partitionRecords.foreach(record => {
    val sql = "insert into table..." // 需要执行的sql操作
    statement.addBatch(sql)
    })
    statement.executeBatch
    conn.commit
    } catch {
    case e: Exception =>
    // do some log
    } finally {
    statement.close()
    conn.close()
    }
    })
    }
    })
    值得注意的是:

    我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。
    如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T)
    部署
    提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置:


    org.apache.hbase
    hbase-client
    1.0.0


    org.apache.hbase
    hbase-common
    1.0.0


    org.apache.hbase
    hbase-server
    1.0.0


    mysql
    mysql-connector-java
    5.1.31


    c3p0
    c3p0
    0.9.1.2

    参考文献:

  • 相关阅读:
    HDU5418.Victor and World(状压DP)
    POJ2686 Traveling by Stagecoach(状压DP)
    POJ3254Corn Fields(状压DP)
    HDU5407.CRB and Candies(数论)
    CodeForces 352D. Jeff and Furik
    CodeForces 352C. Jeff and Rounding(贪心)
    LightOj 1282 Leading and Trailing
    Ural 1057. Amount of Degrees(数位DP)
    HDU 2089 不要62 (数位DP)
    HDU5366 The mook jong (DP)
  • 原文地址:https://www.cnblogs.com/freebird92/p/8948304.html
Copyright © 2011-2022 走看看