在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。
一、把DStream输出到文本文件中
请在NetworkWordCountStateful.scala代码文件中输入以下内容:
package org.apache.spark.examples.streaming import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object NetworkWordCountStateful { def main(args: Array[String]) { //定义状态更新函数 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } StreamingExamples.setStreamingLogLevels() //设置log4j日志级别 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful") val sc = new StreamingContext(conf, Seconds(5)) sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/") //设置检查点,检查点具有容错机制 val lines = sc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() //下面是新增的语句,把DStream保存到文本文件中 stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt") sc.start() sc.awaitTermination() } }
sbt打包编译后,使用如下命令运行程序:
程序运行以后,屏幕上就会显示类似下面的程序运行信息:
打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:
运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:
这些词频结果被成功地输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”文件中
可以发现,在这个目录下,生成了很多文本文件,如下:
output.txt的命名看起来像一个文件,但是,实际上,spark会生成名称为output.txt的目录,而不是文件。
二、把DStream写入到MySQL数据库中
启动MySQL数据库,并完成数据库和表的创建:
在此前已经创建好的“spark”数据库中创建一个名称为“wordcount”的表:
在NetworkWordCountStateful.scala文件中加入下面代码:
把partition里面的每条记录用func函数写到底层的MySQL数据库当中去
完整代码:
package org.apache.spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCountStateful { def main(args: Array[String]) { //定义状态更新函数 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } StreamingExamples.setStreamingLogLevels() //设置log4j日志级别 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful") val sc = new StreamingContext(conf, Seconds(5)) sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/") //设置检查点,检查点具有容错机制 val lines = sc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() //下面是新增的语句,把DStream保存到MySQL数据库中 stateDstream.foreachRDD(rdd => { //内部函数 def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/spark" val user = "root" val password = "hadoop" //数据库密码是hadoop conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into wordcount(word,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) # 第一个字段,去掉空格的字符串 stmt.setInt(2,p._2.toInt) # 第二个字段 stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) sc.start() sc.awaitTermination() } }