zoukankan      html  css  js  c++  java
  • StructuredStreaming简单的例子(NewAPI)

    StructuredStreaming简单的例子(NewAPI)(wordCount)

    package com.briup.streaming.structed
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.OutputMode
    
    object SocketSourceMyTest {
      def main(args: Array[String]): Unit = {
        //设置Logger日志级别
        Logger.getLogger("org").setLevel(Level.WARN)
    
        //1 类似SparkSql构建过程,需要SparkSession对象
        val spark = SparkSession.builder().master("local[*]").appName("SocketSourceMyTest").getOrCreate()
        import spark.implicits._
    
        //2 从某个数据源获取数据
        val df = spark.readStream.format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .option("includeTimestamp", true)
          .load()
    
        //3 数据处理
        //    必须  df  ----> df.writeStream.start()
        //
        val w_c = df.flatMap(row =>
          row.getAs[String]("value").split(" ")
            .map(word => (word,1))
        )
        val res1 = w_c.toDF("word","number").groupBy("word").sum("number")
    
        //4 声明开始执行任务(开启任务)
        val query1 = res1.writeStream
            .outputMode(OutputMode.Complete())
          .format("console")
          .start()
        query1.awaitTermination()
        spark.close()
    
      }
    }
  • 相关阅读:
    文本框模糊匹配(纯html+jquery简单实现)
    ajax 基础2
    ajax 基础
    Js 实战3(实现全选)
    JS 实战2(邮箱选人功能)
    JS 实战1(添加、删除)
    Js 图片轮播渐隐效果
    Js 手风琴效果
    树上莫队
    洛谷 3676
  • 原文地址:https://www.cnblogs.com/Diyo/p/11395051.html
Copyright © 2011-2022 走看看