zoukankan      html  css  js  c++  java
  • 【sparkStreaming】将DStream保存在MySQL

    package SparkDemo
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStreamToMySQL {
        //定义更新函数
        def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {
    
            val currentCount = newValues.foldLeft(0)(_+_)
            val  previousCount = state.getOrElse(0)
            Some(currentCount+previousCount)
        }
        def main(args : Array[String]): Unit ={
            //建立SparkStream
            val conf = new SparkConf().setAppName("DStreamToMySQL")
            val ssc = new StreamingContext(conf,Seconds(1))
            //设置日志等级
            StreamingLoggingExample.setStreamingLogLevels()
    
            val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")
            val words = lines.flatMap(_.split(" "))
            val pairWord = words.map((_,1))
            //累计更新
            val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)
    
            //将stateWordCount存入数据库
            //stateWordCount中包含一堆的Rdd
            //我们需要对每个Rdd中的每条数据进行处理储存
            stateWordCount.foreachRDD(rdd => {
                //每个rdd中包含的数据类型为(String,Int)
                //我们把所有数据records定义为Iterator类型,方便我们遍历
                def func(records:Iterator[(String,Int)]): Unit ={
                    //注意,conn和stmt定义为var不能是val
                    var conn: Connection = null
                    var stmt : PreparedStatement = null
                    try{
                        //连接数据库
                        val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库
                        val user = "root"
                        val password = ""
                        conn = DriverManager.getConnection(url,user,password)
                        //
                        records.foreach(p =>{
                            //wordcount为表名,word和count为要插入数据的属性
                            //插入数据
                            val sql = "insert into wordcount(word,count) values(?,?)"
                            stmt = conn.prepareStatement(sql)
                            stmt.setString(1,p._1.trim)
                            stmt.setInt(2,p._2.toInt)
                            stmt.executeUpdate()
                        })
                    }catch {
                        case e : Exception => e.printStackTrace()
                    }finally {
                        if(stmt != null)
                            stmt.close()
                        if(conn != null)
                            conn.close()
                    }
                }
                val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区
                repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数
            })
            ssc.start()//启动
            ssc.awaitTermination()//等待终止命令
        }
    
    }
    

      

  • 相关阅读:
    js/es6判断对象是否为空,并判断对象是否包含某个属性
    Hive中的SQL执行计划--几乎所有的SQL都有
    spark中的scalaAPI之RDDAPI常用操作
    spark-scala开发的第一个程序WordCount
    linux中添加自定义命令
    kafka学习总结
    flume的sink写入hive表
    Flume架构以及应用介绍(转)
    Appache Flume 中文介绍(转)
    hiveSQL常用日期函数
  • 原文地址:https://www.cnblogs.com/zzhangyuhang/p/9075992.html
Copyright © 2011-2022 走看看