zoukankan      html  css  js  c++  java
  • Spark Streaming之dataset实例

      Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。

      bin/spark-submit --class Streaming /home/wx/Stream.jar
      hadoop fs -put /home/wx/123.txt /user/wx/

    文本123.txt

    NOTICE:07-26 logId[0072]
    NOTICE:07-26 logId[0073]
    NOTICE:07-26 logId[0074]
    NOTICE:07-26 logId[0075]
    NOTICE:07-26 logId[0076]
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.sql.SparkSession
    
    object Streaming {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("RegexpExtract")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        println("hello world")
    
        val lines = ssc.textFileStream("hdfs://name-ha/user/wx/")
    
        val ds = lines.flatMap(_.split("
    "))
    
        ds.print()
    
        ds.foreachRDD { rdd =>
    
          // Get the singleton instance of SparkSession
          val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
          import spark.implicits._
    
          // Convert RDD[String] to DataFrame
          val wordsDataFrame = rdd.toDF("str_col")
    
          // Create a temporary view
          wordsDataFrame.createOrReplaceTempView("df")
    
          // Do word count on DataFrame using SQL and print it
          val wordCountsDataFrame =
            spark.sql(raw"""
              select str_col,
              regexp_extract(str_col,"NOTICE:\d{2}",0) notice,
              regexp_extract(str_col,"logId\[(.*?)\]",0) logId 
              from df""")
          wordCountsDataFrame.show(false)
        }
    
        ssc.start() // Start the computation
        ssc.awaitTermination() // Wait for the computation to terminate
      }
    }
    

    执行结果

    hello world
    -------------------------------------------
    Time: 1501501752000 ms
    -------------------------------------------
    
    NOTICE:07-26 logId[0072]
    NOTICE:07-26 logId[0073]
    NOTICE:07-26 logId[0074]
    NOTICE:07-26 logId[0075]
    NOTICE:07-26 logId[0076]
    
    +------------------------+---------+-----------+
    |str_col                 |notice   |logId      |
    +------------------------+---------+-----------+
    |NOTICE:07-26 logId[0072]|NOTICE:07|logId[0072]|
    |NOTICE:07-26 logId[0073]|NOTICE:07|logId[0073]|
    |NOTICE:07-26 logId[0074]|NOTICE:07|logId[0074]|
    |NOTICE:07-26 logId[0075]|NOTICE:07|logId[0075]|
    |NOTICE:07-26 logId[0076]|NOTICE:07|logId[0076]|
    +------------------------+---------+-----------+
    
    -------------------------------------------
    Time: 1501501770000 ms
    -------------------------------------------
    
  • 相关阅读:
    kvm添加磁盘
    python学习1
    ubuntu使sudo不需要密码
    磁盘挂载
    github/gitlab添加多个ssh key
    生成SSH key
    git 删除追踪状态
    angular2+ 初理解
    本地项目上传到GitHub
    new Date()之参数传递
  • 原文地址:https://www.cnblogs.com/wwxbi/p/7265210.html
Copyright © 2011-2022 走看看