zoukankan      html  css  js  c++  java
  • saprk2 structed streaming

    netcat (windows) >nc -L -p 9999

    import java.sql.Timestamp
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    
    /**
      */
    object Test extends App {
      val host = "localhost"
      val port = 9999
      val windowSize = 10
      val slideSize = 5
      if (slideSize > windowSize) {
        System.err.println("<slide duration> must be less than or equal to <window duration>")
      }
      val windowDuration = s"$windowSize seconds"
      val slideDuration = s"$slideSize seconds"
    
      val spark = SparkSession
        .builder
        .appName("StructuredNetworkWordCountWindowed")
          .master("local[3]")
          .config("spark.sql.shuffle.partitions", 3)
        .getOrCreate()
      spark.sparkContext.setLogLevel("ERROR")
      import spark.implicits._
    
      // Create DataFrame representing the stream of input lines from connection to host:port
      val lines = spark.readStream
        .format("socket")
        .option("host", host)
        .option("port", port)
        .option("includeTimestamp", true)
        .load()
    
      // Split the lines into words, retaining timestamps
      val words = lines.as[(String, Timestamp)].flatMap(line =>
        line._1.split(" ").map(word => (word, line._2))
      ).toDF("word", "timestamp")
    
      // Group the data by window and word and compute the count of each group
      val windowedCounts = words.groupBy(
        window($"timestamp", windowDuration, slideDuration), $"word"
      ).count().orderBy($"window".desc)
    
      // Start running the query that prints the windowed word counts to the console
      val query = windowedCounts.writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", "false")
        .start()
    
      query.awaitTermination()
    
    }
    
    

    Result:

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +---------------------------------------------+----+-----+
    |window                                       |word|count|
    +---------------------------------------------+----+-----+
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|b   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|a   |3    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|c   |1    |
    |[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|d   |1    |
    |[2017-10-24 16:06:40.0,2017-10-24 16:06:50.0]|a   |4    |
    |[2017-10-24 16:06:35.0,2017-10-24 16:06:45.0]|a   |8    |
    |[2017-10-24 16:06:30.0,2017-10-24 16:06:40.0]|a   |4    |
    +---------------------------------------------+----+-----+
    

    窗口移动5秒,窗口宽度10秒。
    聚合维度: window, {world}

    http://asyncified.io/2017/07/30/exploring-stateful-streaming-with-spark-structured-streaming/

  • 相关阅读:
    Logstash中如何处理到ElasticSearch的数据映射
    Filebeat+Logstash+ElasticSearch+Kibana搭建Apache访问日志解析平台
    Log stash学习笔记(一)
    Plupload设置自定义参数
    优先队列的基本算法(使用二叉堆构建优先队列)
    用OC基于数组实现循环队列
    循环队列的基本算法
    用OC基于链表实现链队列
    链队列的基本算法
    用OC实现一个栈:结合单链表创建动态栈
  • 原文地址:https://www.cnblogs.com/luweiseu/p/7724034.html
Copyright © 2011-2022 走看看