zoukankan      html  css  js  c++  java
  • spark 系列之五 SparkStreaming数据源之文件流

    上一讲提到,spark是一个数据处理的框架,用来处理大量的数据,因为在传统的使用中,我们的文件可能只存在一台物理机器上。

    但是随着互联网的发展,由于业务需要处理的数据越来越多,单台机器无法处理大量的数据,spark框架应运而生。

    实际业务中,有些数据是需要实时监控处理计算的,比如网站的用户行为数据,舆情数据等等,SparkStreaming框架应运而生,SparkStreaming主要用来处理实时的流式数据。

    比如说文件的实时变化,监控socket接口,kafka数据的实时监控,flume数据的实时监控等。

    本篇主要讲,监控文件的实时变化。

    首先我们先来模拟一个实时生成文件流的程序。主要用来在某个目录下面不断的生成带数据的文件。有一点需要注意,在windows中,直接在某个文件下粘贴,复制文件是不行的,因为这种形式产生的不是文件流,因此SpeakStreaming无法监控。

    产生文件流的程序如下:

    import java.io.FileWriter
    import scala.util.Random
    
    object DataCreater {
      //初始化地址值,循环次数,数据
    
      def Creater(): Unit ={
        val rand = new Random()
        var price = rand.nextInt(999)+1000
        val datapath = "D:/software_download/spark_text/streaming/logfile/score_"+price+".txt"
        val max_records = 20
        val brand = Array("手机", "笔记本", "小龙虾", "卫生纸", "吸尘器",  "苹果", "洗面奶", "保温杯")
        val writer: FileWriter = new FileWriter(datapath,true)
    
        // create age of data
        for(i <- 1 to max_records){
          //电器名称
          var phonePlus = brand(rand.nextInt(3))
          //电器价格
          var price = rand.nextInt(999)+1000
          //数据拼接
          writer.write( phonePlus + " " + price)
          writer.write(System.getProperty("line.separator"))
        }
        writer.flush()
        writer.close()
      }
      def main(args: Array[String]): Unit = {
        var i =0
        while (true){
          Creater()
          i+=1
          printf("====第%d批数据====",i)
          println()
          Thread.sleep(3000)
        }
    
        System.exit(1)
      }
    }

    需要先运行以上的程序不断的产生数据,控制台信息如下:

     然后会在程序中的目下持续的生成文件如下:

     启动sparkSteaming程序进行监听,程序如下:

    import org.apache.spark._
    import org.apache.spark.streaming._
    
    object SparkStreaming_localfile {
      def main(args: Array[String]): Unit = {
        /**
         * setAppName(“TestDStream”)是用来设置应用程序名称,这里我们取名为“TestDStream”。setMaster(“local[2]”)
         * 括号里的参数”local[2]’字符串表示运行在本地模式下,并且启动2个工作线程。
         */
        val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(3))
        val lines = ssc.textFileStream("file:///D:/software_download/spark_text/streaming/logfile/")
        val words = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
        words.print()
        ssc.start()
        ssc.awaitTermination()
    
      }
    }

    运行情况如下:

     会实时的统计你所产生的文件。

    以上:)

  • 相关阅读:
    2019hdu多校1
    codefroce842C
    [codeforce686D]树的重心
    [codeforce1188C&D]
    Educational Codeforces Round 66
    [hdu4343]interval query
    Luogu 4234 最小差值生成树
    BZOJ 2594 水管局长
    Luogu 2173 [ZJOI2012]网络
    Luogu 2147 洞穴勘测
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14217165.html
Copyright © 2011-2022 走看看