上一讲提到,spark是一个数据处理的框架,用来处理大量的数据,因为在传统的使用中,我们的文件可能只存在一台物理机器上。
但是随着互联网的发展,由于业务需要处理的数据越来越多,单台机器无法处理大量的数据,spark框架应运而生。
实际业务中,有些数据是需要实时监控处理计算的,比如网站的用户行为数据,舆情数据等等,SparkStreaming框架应运而生,SparkStreaming主要用来处理实时的流式数据。
比如说文件的实时变化,监控socket接口,kafka数据的实时监控,flume数据的实时监控等。
本篇主要讲,监控文件的实时变化。
首先我们先来模拟一个实时生成文件流的程序。主要用来在某个目录下面不断的生成带数据的文件。有一点需要注意,在windows中,直接在某个文件下粘贴,复制文件是不行的,因为这种形式产生的不是文件流,因此SpeakStreaming无法监控。
产生文件流的程序如下:
import java.io.FileWriter import scala.util.Random object DataCreater { //初始化地址值,循环次数,数据 def Creater(): Unit ={ val rand = new Random() var price = rand.nextInt(999)+1000 val datapath = "D:/software_download/spark_text/streaming/logfile/score_"+price+".txt" val max_records = 20 val brand = Array("手机", "笔记本", "小龙虾", "卫生纸", "吸尘器", "苹果", "洗面奶", "保温杯") val writer: FileWriter = new FileWriter(datapath,true) // create age of data for(i <- 1 to max_records){ //电器名称 var phonePlus = brand(rand.nextInt(3)) //电器价格 var price = rand.nextInt(999)+1000 //数据拼接 writer.write( phonePlus + " " + price) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() } def main(args: Array[String]): Unit = { var i =0 while (true){ Creater() i+=1 printf("====第%d批数据====",i) println() Thread.sleep(3000) } System.exit(1) } }
需要先运行以上的程序不断的产生数据,控制台信息如下:
然后会在程序中的目下持续的生成文件如下:
启动sparkSteaming程序进行监听,程序如下:
import org.apache.spark._ import org.apache.spark.streaming._ object SparkStreaming_localfile { def main(args: Array[String]): Unit = { /** * setAppName(“TestDStream”)是用来设置应用程序名称,这里我们取名为“TestDStream”。setMaster(“local[2]”) * 括号里的参数”local[2]’字符串表示运行在本地模式下,并且启动2个工作线程。 */ val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) val lines = ssc.textFileStream("file:///D:/software_download/spark_text/streaming/logfile/") val words = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) words.print() ssc.start() ssc.awaitTermination() } }
运行情况如下:
会实时的统计你所产生的文件。
以上:)