  • saprk2 structed streaming

    netcat (windows) >nc -L -p 9999

    import java.sql.Timestamp
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    object Test extends App {
      val host = "localhost"
      val port = 9999
      val windowSize = 10
      val slideSize = 5
      if (slideSize > windowSize) {
        System.err.println("<slide duration> must be less than or equal to <window duration>")
      val windowDuration = s"$windowSize seconds"
      val slideDuration = s"$slideSize seconds"
      val spark = SparkSession
          .config("spark.sql.shuffle.partitions", 3)
      import spark.implicits._
      // Create DataFrame representing the stream of input lines from connection to host:port
      val lines = spark.readStream
        .option("host", host)
        .option("port", port)
        .option("includeTimestamp", true)
      // Split the lines into words, retaining timestamps
      val words = lines.as[(String, Timestamp)].flatMap(line =>
        line._1.split(" ").map(word => (word, line._2))
      ).toDF("word", "timestamp")
      // Group the data by window and word and compute the count of each group
      val windowedCounts = words.groupBy(
        window($"timestamp", windowDuration, slideDuration), $"word"
      // Start running the query that prints the windowed word counts to the console
      val query = windowedCounts.writeStream
        .option("truncate", "false")


    Batch: 1
    |window                                       |word|count|
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|b   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|a   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|c   |1    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|d   |1    |
    |[2017-10-24 16:06:40.0,2017-10-24 16:06:50.0]|a   |4    |
    |[2017-10-24 16:06:35.0,2017-10-24 16:06:45.0]|a   |8    |
    |[2017-10-24 16:06:30.0,2017-10-24 16:06:40.0]|a   |4    |

    聚合维度: window, {world}


