netcat (windows) >nc -L -p 9999
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object Test extends App {
val host = "localhost"
val port = 9999
val windowSize = 10
val slideSize = 5
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
val windowDuration = s"$windowSize seconds"
val slideDuration = s"$slideSize seconds"
val spark = SparkSession
.config("spark.sql.shuffle.partitions", 3)
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
// Split the lines into words, retaining timestamps
val words =[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", windowDuration, slideDuration), $"word"
// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.option("truncate", "false")
Batch: 1
|window |word|count|
|[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|b |3 |
|[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|a |3 |
|[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|c |1 |
|[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|d |1 |
|[2017-10-24 16:06:40.0,2017-10-24 16:06:50.0]|a |4 |
|[2017-10-24 16:06:35.0,2017-10-24 16:06:45.0]|a |8 |
|[2017-10-24 16:06:30.0,2017-10-24 16:06:40.0]|a |4 |
聚合维度: window, {world}