zoukankan      html  css  js  c++  java
  • StructuredStreaming-Sink

    Output Modes输出模式

    package cn.itcast.structured
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    /**
     * Author itcast
     * Desc 演示StructuredStreaming的Sink_OutPutMode
     */
    object Demo05_Sink_OutPutMode {
      def main(args: Array[String]): Unit = {
        //TODO 0.创建环境
        //因为StructuredStreaming基于SparkSQL的且编程API/数据抽象是DataFrame/DataSet,所以这里创建SparkSession即可
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
          .config("spark.sql.shuffle.partitions", "4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
    
        //TODO 1.加载数据
        val df: DataFrame = spark.readStream
          .format("socket")
          .option("host", "master")
          .option("port", 9999)
          .load()
    
        df.printSchema()
    
        //TODO 2.处理数据
        val ds: Dataset[String] = df.as[String]
        val result1: Dataset[Row] = ds.flatMap(_.split(" "))
          .groupBy('value)
          .count()
          .orderBy('count.desc)
    
        val result2: Dataset[Row] = ds.flatMap(_.split(" "))
          .groupBy('value)
          .count()
    
        val result3: Dataset[String] = ds.flatMap(_.split(" "))
    
    
        //TODO 3.输出结果
        result1.writeStream
            .format("console")
            //.outputMode("append")//Append output mode not supported
            //.outputMode("update")//Sorting is not supported
          .outputMode("complete")
          .start()
          .awaitTermination()
    
    //    result2.writeStream
    //      .format("console")
    //      .outputMode("update")
    //      .start()
    //      .awaitTermination()
    
    //    result3.writeStream
    //      .format("console")
    //      .outputMode("append")
    //      //TODO 4.启动并等待结束
    //      .start()
    //      .awaitTermination()
    
        //TODO 5.关闭资源
        spark.stop()
      }
    }

     输出的结果会根据所设置的模式不同,结果不同。

    Sink位置

     输出到内存,并建立一张表,供查询。

    package cn.itcast.structured
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.streaming.StreamingQuery
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    /**
     * Author itcast
     * Desc 演示StructuredStreaming的Sink_Location
     */
    object Demo06_Sink_Location {
      def main(args: Array[String]): Unit = {
        //TODO 0.创建环境
        //因为StructuredStreaming基于SparkSQL的且编程API/数据抽象是DataFrame/DataSet,所以这里创建SparkSession即可
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
          .config("spark.sql.shuffle.partitions", "4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
    
        //TODO 1.加载数据
        val df: DataFrame = spark.readStream
          .format("socket")
          .option("host", "master")
          .option("port", 9999)
          .load()
    
        df.printSchema()
    
        //TODO 2.处理数据
        val ds: Dataset[String] = df.as[String]
        val result: Dataset[Row] = ds.flatMap(_.split(" "))
          .groupBy('value)
          .count()
          .orderBy('count.desc)
    
        //TODO 3.输出结果
        /*result.writeStream
            .format("console")
            .outputMode("complete")
            .start()
            .awaitTermination()*/
    
        val query: StreamingQuery = result.writeStream
          .format("memory")
          .queryName("t_result")
          .outputMode("complete")
          //TODO 4.启动并等待结束
          .start()
          //.awaitTermination()
        while(true){
          spark.sql("select * from t_result").show
          Thread.sleep(3000)
        }
    
        //query.awaitTermination()
    
        //TODO 5.关闭资源
        spark.stop()
      }
    }

    ForeachBatch Sink

    小批次的输出数据,可以是控制台,也可以是数据库等。。

    package cn.itcast.structured
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql._
    
    /**
     * Author itcast
     * Desc 演示StructuredStreaming的Sink_ForeachBatch
     */
    object Demo07_Sink_ForeachBatch {
      def main(args: Array[String]): Unit = {
        //TODO 0.创建环境
        //因为StructuredStreaming基于SparkSQL的且编程API/数据抽象是DataFrame/DataSet,所以这里创建SparkSession即可
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
          .config("spark.sql.shuffle.partitions", "4") //本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
    
        //TODO 1.加载数据
        val df: DataFrame = spark.readStream
          .format("socket")
          .option("host", "master")
          .option("port", 9999)
          .load()
    
        df.printSchema()
    
        //TODO 2.处理数据
        val ds: Dataset[String] = df.as[String]
        val result: Dataset[Row] = ds.flatMap(_.split(" "))
          .groupBy('value)
          .count()
          .orderBy('count.desc)
    
        //TODO 3.输出结果
        /*result.writeStream
            .format("console")
            .outputMode("complete")
            .start()
            .awaitTermination()*/
        result.writeStream
          .foreachBatch((ds: Dataset[Row], batchId:Long) => {
            //自定义输出到控制台
            println("-------------")
            println(s"batchId:${batchId}")
            println("-------------")
            ds.show()
            //自定义输出到MySQL
            ds.coalesce(1)
              .write
              .mode(SaveMode.Overwrite)
              .format("jdbc")
              //.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8
              //.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8
              .option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
              .option("user", "root")
              .option("password", "123456")
              .option("dbtable", "bigdata.t_struct_words")
              .save()
          })
          .outputMode("complete")
          //TODO 4.启动并等待结束
          .start()
          .awaitTermination()
    
    
        //TODO 5.关闭资源
        spark.stop()
      }
    }

     

  • 相关阅读:
    杜教筛
    虚树
    带修莫队
    线性基
    区间修改区间求和cdq分治
    矩阵快速幂求斐波那契数列
    点分治成品
    Codeforces Round #542 [Alex Lopashev Thanks-Round] (Div. 1) C(二分+KMP)
    线性筛
    矩阵快速幂
  • 原文地址:https://www.cnblogs.com/a155-/p/14529355.html
Copyright © 2011-2022 走看看