zoukankan      html  css  js  c++  java
  • Spark Streaming之dataset实例

      Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。

      bin/spark-submit --class Streaming /home/wx/Stream.jar
      hadoop fs -put /home/wx/123.txt /user/wx/

    文本123.txt

    NOTICE:07-26 logId[0072]
    NOTICE:07-26 logId[0073]
    NOTICE:07-26 logId[0074]
    NOTICE:07-26 logId[0075]
    NOTICE:07-26 logId[0076]
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.sql.SparkSession
    
    object Streaming {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("RegexpExtract")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        println("hello world")
    
        val lines = ssc.textFileStream("hdfs://name-ha/user/wx/")
    
        val ds = lines.flatMap(_.split("
    "))
    
        ds.print()
    
        ds.foreachRDD { rdd =>
    
          // Get the singleton instance of SparkSession
          val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
          import spark.implicits._
    
          // Convert RDD[String] to DataFrame
          val wordsDataFrame = rdd.toDF("str_col")
    
          // Create a temporary view
          wordsDataFrame.createOrReplaceTempView("df")
    
          // Do word count on DataFrame using SQL and print it
          val wordCountsDataFrame =
            spark.sql(raw"""
              select str_col,
              regexp_extract(str_col,"NOTICE:\d{2}",0) notice,
              regexp_extract(str_col,"logId\[(.*?)\]",0) logId 
              from df""")
          wordCountsDataFrame.show(false)
        }
    
        ssc.start() // Start the computation
        ssc.awaitTermination() // Wait for the computation to terminate
      }
    }
    

    执行结果

    hello world
    -------------------------------------------
    Time: 1501501752000 ms
    -------------------------------------------
    
    NOTICE:07-26 logId[0072]
    NOTICE:07-26 logId[0073]
    NOTICE:07-26 logId[0074]
    NOTICE:07-26 logId[0075]
    NOTICE:07-26 logId[0076]
    
    +------------------------+---------+-----------+
    |str_col                 |notice   |logId      |
    +------------------------+---------+-----------+
    |NOTICE:07-26 logId[0072]|NOTICE:07|logId[0072]|
    |NOTICE:07-26 logId[0073]|NOTICE:07|logId[0073]|
    |NOTICE:07-26 logId[0074]|NOTICE:07|logId[0074]|
    |NOTICE:07-26 logId[0075]|NOTICE:07|logId[0075]|
    |NOTICE:07-26 logId[0076]|NOTICE:07|logId[0076]|
    +------------------------+---------+-----------+
    
    -------------------------------------------
    Time: 1501501770000 ms
    -------------------------------------------
    
  • 相关阅读:
    JavaScript全局属性和全局函数
    bilibili源码泄漏后,程序员们从代码里扒出来的彩蛋
    视觉有难,八方点赞。
    北上广等一线城市IT岗位已接近饱和?
    做大数据分析的怎么可以不会这个?
    当用户管理系统遇上python和mongodb后……
    一篇文章看懂大数据分析就业前景及职能定位
    是程序员,就用python导出pdf
    上次被人说TK不好咯,这次给你整个高大上的
    别再说Python没有枚举类型了,好好看看
  • 原文地址:https://www.cnblogs.com/wwxbi/p/7265210.html
Copyright © 2011-2022 走看看