zoukankan      html  css  js  c++  java
  • StructuredStreaming简单的例子(NewAPI)

    StructuredStreaming简单的例子(NewAPI)(wordCount)

    package com.briup.streaming.structed
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.OutputMode
    
    object SocketSourceMyTest {
      def main(args: Array[String]): Unit = {
        //设置Logger日志级别
        Logger.getLogger("org").setLevel(Level.WARN)
    
        //1 类似SparkSql构建过程,需要SparkSession对象
        val spark = SparkSession.builder().master("local[*]").appName("SocketSourceMyTest").getOrCreate()
        import spark.implicits._
    
        //2 从某个数据源获取数据
        val df = spark.readStream.format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .option("includeTimestamp", true)
          .load()
    
        //3 数据处理
        //    必须  df  ----> df.writeStream.start()
        //
        val w_c = df.flatMap(row =>
          row.getAs[String]("value").split(" ")
            .map(word => (word,1))
        )
        val res1 = w_c.toDF("word","number").groupBy("word").sum("number")
    
        //4 声明开始执行任务(开启任务)
        val query1 = res1.writeStream
            .outputMode(OutputMode.Complete())
          .format("console")
          .start()
        query1.awaitTermination()
        spark.close()
    
      }
    }
  • 相关阅读:
    Django-ORM
    深入理解vue 修饰符sync
    PS切图
    用Chrome 浏览器调试移动端网页 chrome://inspect/#devices
    float浮动导致父元素高度坍塌的原因及清除浮动方法
    vue keep-alive
    ES6 箭头函数
    ES6 Module(模块)
    MVC模式 和 MVVM模式
    移动端适配代码
  • 原文地址:https://www.cnblogs.com/Diyo/p/11395051.html
Copyright © 2011-2022 走看看