package SparkDemo import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object DStreamToMySQL { //定义更新函数 def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = { val currentCount = newValues.foldLeft(0)(_+_) val previousCount = state.getOrElse(0) Some(currentCount+previousCount) } def main(args : Array[String]): Unit ={ //建立SparkStream val conf = new SparkConf().setAppName("DStreamToMySQL") val ssc = new StreamingContext(conf,Seconds(1)) //设置日志等级 StreamingLoggingExample.setStreamingLogLevels() val lines = ssc.textFileStream("/tmp/yuhang.zhang/data") val words = lines.flatMap(_.split(" ")) val pairWord = words.map((_,1)) //累计更新 val stateWordCount = pairWord.updateStateByKey[Int](updateFunc) //将stateWordCount存入数据库 //stateWordCount中包含一堆的Rdd //我们需要对每个Rdd中的每条数据进行处理储存 stateWordCount.foreachRDD(rdd => { //每个rdd中包含的数据类型为(String,Int) //我们把所有数据records定义为Iterator类型,方便我们遍历 def func(records:Iterator[(String,Int)]): Unit ={ //注意,conn和stmt定义为var不能是val var conn: Connection = null var stmt : PreparedStatement = null try{ //连接数据库 val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库 val user = "root" val password = "" conn = DriverManager.getConnection(url,user,password) // records.foreach(p =>{ //wordcount为表名,word和count为要插入数据的属性 //插入数据 val sql = "insert into wordcount(word,count) values(?,?)" stmt = conn.prepareStatement(sql) stmt.setString(1,p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) }catch { case e : Exception => e.printStackTrace() }finally { if(stmt != null) stmt.close() if(conn != null) conn.close() } } val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区 repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数 }) ssc.start()//启动 ssc.awaitTermination()//等待终止命令 } }