zoukankan      html  css  js  c++  java
  • Structured Streaming 计算操作And输出操作

     

    1.1.      计算操作

    获得到Source之后的基本数据处理方式和之前学习的DataFrameDataSet一致,不再赘述

    ●官网示例代码:

    caseclassDeviceData(device:String, deviceType:String, signal:Double, time:DateTime)

    val df:DataFrame=...// streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }

    val ds:Dataset[DeviceData]= df.as[DeviceData]    // streaming Dataset with IOT device data

    // Select the devices which have signal more than 10

    df.select("device").where("signal > 10")      // using untyped APIs  

    ds.filter(_.signal >10).map(_.device)         // using typed APIs

    // Running count of the number of updates for each device type

    df.groupBy("deviceType").count()                 // using untyped API

    // Running average signal for each device type

    importorg.apache.spark.sql.expressions.scalalang.typed

    ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

     

    1.2.        输出

    计算结果可以选择输出到多种设备并进行如下设定

    1.output mode:以哪种方式将result table的数据写入sink

    2.format/output sink的一些细节:数据格式、位置等。

    3.query name:指定查询的标识。类似tempview的名字

    4.trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据

    5.checkpoint地址:一般是hdfs上的目录。注意:Socket不支持数据恢复,如果设置了,第二次启动会报错

     

    1.2.1. output mode

    graphic

    每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

    这里有三种输出模型:

    1.Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询selectwheremapflatMapfilterjoin等会支持追加模式。不支持聚合

    2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

    3.Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(Spark 2.1.1起可用),不支持排序

      

    1.2.2. output sink

    ●使用说明

    File sink - Stores the output to a directory.支持parquet文件,以及append模式

    writeStream

        .format("parquet")        // can be "orc", "json", "csv", etc.

        .option("path","path/to/destination/dir")

        .start()

    Kafka sink - Stores the output to one or more topics in Kafka.

    writeStream

        .format("kafka")

        .option("kafka.bootstrap.servers","host1:port1,host2:port2")

        .option("topic","updates")

        .start()

    Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.

    writeStream

        .foreach(...)

        .start()

    Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.

    writeStream

        .format("console")

        .start()

    Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.

    writeStream

        .format("memory")

        .queryName("tableName")

        .start()

     

    ●官网示例代码

    // ========== DF with no aggregations ==========

    val noAggDF = deviceDataDf.select("device").where("signal > 10")  

    // Print new data to console

    noAggDF

      .writeStream

      .format("console")

      .start()

    // Write new data to Parquet files

    noAggDF

      .writeStream

      .format("parquet")

      .option("checkpointLocation","path/to/checkpoint/dir")

      .option("path","path/to/destination/dir")

      .start()

    // ========== DF with aggregation ==========

    val aggDF = df.groupBy("device").count()

    // Print updated aggregations to console

    aggDF

      .writeStream

      .outputMode("complete")

      .format("console")

      .start()

    // Have all the aggregates in an in-memory table

    aggDF

      .writeStream

      .queryName("aggregates")    // this query name will be the table name

      .outputMode("complete")

      .format("memory")

      .start()

     

    spark.sql("select * from aggregates").show()   // interactively query in-memory table



  • 相关阅读:
    王道数据结构 (31) 循环双链表
    王道数据结构 (30) 循环单链表
    php 单页面登陆注册实例,附 XSS 攻击试验实例
    php防止CSRF(跨站请求伪造)的原理实现示例
    文件上传类
    PowerDesigner 中出现Could not Initialize JavaVM
    淘宝信用等级
    面试被问了几百遍的 IoC 和 AOP ,还在傻傻搞不清楚?
    外设驱动库开发笔记15:DHT11温湿度传感器驱动
    Modbus协议栈应用实例之六:Modbus ASCII从站应用
  • 原文地址:https://www.cnblogs.com/TiePiHeTao/p/7a0c11a24f3cdf3f4c1868a1eed6dbe3.html
Copyright © 2011-2022 走看看