zoukankan      html  css  js  c++  java
  • 7.4 基本输入源

    一、文件流

    1.在spark-shell中创建文件流

    进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell 

    上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

    在“/usr/local/spark/mycode/streaming/logfile”目录下新建一个log.txt文件,就可以在监听窗口中显示词频统计结果

    2.采用独立应用程序方式创建文件流

    在当前streaming下创建三级子目录,因为只有把代码放到src/main/scala目录下,sbt打包编译工具才能够正确运行。

    用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:

    import org.apache.spark._ 
    import org.apache.spark.streaming._
    object WordCountStreaming {  
      def main(args: Array[String]) {  
        val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据    
        val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒    
        val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")  //这里采用本地文件,当然你也可以采用HDFS文件
        val words = lines.flatMap(_.split(" "))  
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)  
        wordCounts.print()  
        ssc.start()  
        ssc.awaitTermination()  
      }  
    } 

    在simple.sbt文件中输入以下代码: 

    执行sbt打包编译的命令如下:

     打包成功以后,就可以输入以下命令启动这个程序:

    1. 执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口)
    2. 切换到另外一个Shell窗口,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log2.txt文件,文件里面随便输入一些单词,保存好文件退出vim编辑器
    3. 再次切换回“监听窗口”,等待20秒以后,按键盘Ctrl+C或者Ctrl+D停止监听程序,就可以看到监听窗口的屏幕上会打印出单词统计信息

    二、套接字流

    Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理

    1.Socket工作原理

    2.使用套接字流作为数据源

     

    (1)客户端

    请在NetworkWordCount.scala文件中输入如下内容:(客户端向服务端发起连接,需要告诉它向哪个主机哪个端口发起连接)

    package org.apache.spark.examples.streaming
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.storage.StorageLevel
    
    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount <hostname> <port>")
          System.exit(1)
        }
        StreamingExamples.setStreamingLogLevels() # 设置日志显示级别   
        val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") #生成sparkConf对象
        val ssc = new StreamingContext(sparkConf, Seconds(1)) #生成一个StreamingContext对象
       # 1.定义输入数据流,args(0)是TCP服务端的主机名,args(1)是TCP服务端的端口号(字符串转整数) val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)#保存数据方式 val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }

    在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:

    package org.apache.spark.examples.streaming
    import org.apache.spark.internal.Logging
    import org.apache.log4j.{Level, Logger}
    /** Utility functions for Spark Streaming examples. */
    object StreamingExamples extends Logging { # 单例对象,不需要实例化,直接用它的静态方法
      /** Set reasonable logging levels for streaming if the user has not configured log4j. */
      def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
          // We first log something to initialize Spark's default logging, then we override the
          // logging level.
          logInfo("Setting log level to [WARN] for streaming example." +
            " To override add a custom log4j.properties to the classpath.")
          Logger.getRootLogger.setLevel(Level.WARN)
        }
      }
    }

    编译打包

    在simple.sbt文件中输入以下代码

    对streaming整体进行sbt编译打包

    打包成功以后,就可以输入以下命令启动这个程序

    (2)服务端

    新打开一个窗口作为nc窗口,启动nc程序:

    可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会在屏幕上出现类似如下的结果:

    3.使用Socket编程实现自定义数据源

    下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源(服务端),源源不断的产生数据。

    与词频统计客户端在同一个包

    监听过程:

    执行sbt打包编译: 

     

    DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件word.txt并随便输入几行内容:/usr/local/spark/mycode/streaming/word.txt 

    启动DataSourceSocket程序:

    这个窗口会不断打印出一些随机读取到的文本信息,这些信息也是Socket数据源,会被监听程序捕捉到

    在另外一个窗口启动监听程序(客户端,向服务端发送请求):

    启动成功后,你就会看到,屏幕上不断打印出词频统计信息

    三、RDD队列流

    在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。

    新建一个TestRDDQueueStream.scala代码文件,功能是:每隔1秒创建一个RDD,Streaming每隔2秒就对数据进行处理。

     

    sbt打包成功后,执行下面命令运行程序

     执行上面命令以后,程序就开始运行,就可以看到类似下面的结果:

    参考文献:

    【1】https://www.icourse163.org/learn/XMU-1205811805?tid=1206617233&from=study#/learn/content?type=detail&id=1211383870&cid=1214032055&replay=true

  • 相关阅读:
    Jenkins+Tomcat+svn+maven自动化构建简单过程
    Eclipse常用的6个Debug技巧
    在linux服务器上发布web应用的完整过程
    【转】解决response.AddHeader("Content-Disposition", "attachment; fileName=" + fileName) 中文显示乱码
    springmvc缓存和mybatis缓存
    springmvc文件上传和下载
    博客园API
    整理一下CoreGraphic和Quartz2D的知识(二)
    整理一下CoreGraphic和Quartz2D的知识(一)
    CGPoint和CGSize以及CGRect的一些方法~
  • 原文地址:https://www.cnblogs.com/nxf-rabbit75/p/12025106.html
Copyright © 2011-2022 走看看