zoukankan      html  css  js  c++  java
  • saprk2 structed streaming

    netcat (windows) >nc -L -p 9999

    import java.sql.Timestamp
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    
    /**
      */
    object Test extends App {
      val host = "localhost"
      val port = 9999
      val windowSize = 10
      val slideSize = 5
      if (slideSize > windowSize) {
        System.err.println("<slide duration> must be less than or equal to <window duration>")
      }
      val windowDuration = s"$windowSize seconds"
      val slideDuration = s"$slideSize seconds"
    
      val spark = SparkSession
        .builder
        .appName("StructuredNetworkWordCountWindowed")
          .master("local[3]")
          .config("spark.sql.shuffle.partitions", 3)
        .getOrCreate()
      spark.sparkContext.setLogLevel("ERROR")
      import spark.implicits._
    
      // Create DataFrame representing the stream of input lines from connection to host:port
      val lines = spark.readStream
        .format("socket")
        .option("host", host)
        .option("port", port)
        .option("includeTimestamp", true)
        .load()
    
      // Split the lines into words, retaining timestamps
      val words = lines.as[(String, Timestamp)].flatMap(line =>
        line._1.split(" ").map(word => (word, line._2))
      ).toDF("word", "timestamp")
    
      // Group the data by window and word and compute the count of each group
      val windowedCounts = words.groupBy(
        window($"timestamp", windowDuration, slideDuration), $"word"
      ).count().orderBy($"window".desc)
    
      // Start running the query that prints the windowed word counts to the console
      val query = windowedCounts.writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", "false")
        .start()
    
      query.awaitTermination()
    
    }
    
    

    Result:

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +---------------------------------------------+----+-----+
    |window                                       |word|count|
    +---------------------------------------------+----+-----+
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|b   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|a   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|c   |1    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|d   |1    |
    |[2017-10-24 16:06:40.0,2017-10-24 16:06:50.0]|a   |4    |
    |[2017-10-24 16:06:35.0,2017-10-24 16:06:45.0]|a   |8    |
    |[2017-10-24 16:06:30.0,2017-10-24 16:06:40.0]|a   |4    |
    +---------------------------------------------+----+-----+
    

    窗口移动5秒,窗口宽度10秒。
    聚合维度: window, {world}

    http://asyncified.io/2017/07/30/exploring-stateful-streaming-with-spark-structured-streaming/

  • 相关阅读:
    如何不传入对象就获得某对象的方法---ThreadLocal类
    Linux系统主目录被更改,怎么修改回去?
    tree命令的安装
    Linux命令学习man
    当重载函数的参数是Object和Object数组的时候会发生什么情况!!!
    Linux学习(二)之内核、系统调用、库
    使用puttygen转换OpenSSH SSH2私钥为RSA PEM格式
    docker-compose使用详解
    svn迁移到gitlab
    linux快速启动http端口
  • 原文地址:https://www.cnblogs.com/luweiseu/p/7724034.html
Copyright © 2011-2022 走看看