zoukankan      html  css  js  c++  java
  • Structured Streaming 计算操作And输出操作

     

    1.1.      计算操作

    获得到Source之后的基本数据处理方式和之前学习的DataFrameDataSet一致,不再赘述

    ●官网示例代码:

    caseclassDeviceData(device:String, deviceType:String, signal:Double, time:DateTime)

    val df:DataFrame=...// streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }

    val ds:Dataset[DeviceData]= df.as[DeviceData]    // streaming Dataset with IOT device data

    // Select the devices which have signal more than 10

    df.select("device").where("signal > 10")      // using untyped APIs  

    ds.filter(_.signal >10).map(_.device)         // using typed APIs

    // Running count of the number of updates for each device type

    df.groupBy("deviceType").count()                 // using untyped API

    // Running average signal for each device type

    importorg.apache.spark.sql.expressions.scalalang.typed

    ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

     

    1.2.        输出

    计算结果可以选择输出到多种设备并进行如下设定

    1.output mode:以哪种方式将result table的数据写入sink

    2.format/output sink的一些细节:数据格式、位置等。

    3.query name:指定查询的标识。类似tempview的名字

    4.trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据

    5.checkpoint地址:一般是hdfs上的目录。注意:Socket不支持数据恢复,如果设置了,第二次启动会报错

     

    1.2.1. output mode

    graphic

    每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

    这里有三种输出模型:

    1.Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询selectwheremapflatMapfilterjoin等会支持追加模式。不支持聚合

    2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

    3.Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(Spark 2.1.1起可用),不支持排序

      

    1.2.2. output sink

    ●使用说明

    File sink - Stores the output to a directory.支持parquet文件,以及append模式

    writeStream

        .format("parquet")        // can be "orc", "json", "csv", etc.

        .option("path","path/to/destination/dir")

        .start()

    Kafka sink - Stores the output to one or more topics in Kafka.

    writeStream

        .format("kafka")

        .option("kafka.bootstrap.servers","host1:port1,host2:port2")

        .option("topic","updates")

        .start()

    Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.

    writeStream

        .foreach(...)

        .start()

    Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.

    writeStream

        .format("console")

        .start()

    Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.

    writeStream

        .format("memory")

        .queryName("tableName")

        .start()

     

    ●官网示例代码

    // ========== DF with no aggregations ==========

    val noAggDF = deviceDataDf.select("device").where("signal > 10")  

    // Print new data to console

    noAggDF

      .writeStream

      .format("console")

      .start()

    // Write new data to Parquet files

    noAggDF

      .writeStream

      .format("parquet")

      .option("checkpointLocation","path/to/checkpoint/dir")

      .option("path","path/to/destination/dir")

      .start()

    // ========== DF with aggregation ==========

    val aggDF = df.groupBy("device").count()

    // Print updated aggregations to console

    aggDF

      .writeStream

      .outputMode("complete")

      .format("console")

      .start()

    // Have all the aggregates in an in-memory table

    aggDF

      .writeStream

      .queryName("aggregates")    // this query name will be the table name

      .outputMode("complete")

      .format("memory")

      .start()

     

    spark.sql("select * from aggregates").show()   // interactively query in-memory table



  • 相关阅读:
    从头搭建Openstack运行环境(七)--实现负载均衡与外网访问
    ML2分层端口绑定技术在SDN开发中的应用(一)
    从头搭建Openstack运行环境(六)--租户网络间路由与防火墙
    翻译校对1
    pykube-pod.obj的json字符串解析
    第一版k8s
    the server does not allow access to the requested resource
    have fun of Docker
    Clean Development Series
    Understanding the GitHub Flow官方的是最好的,永远要看第一手资料
  • 原文地址:https://www.cnblogs.com/TiePiHeTao/p/7a0c11a24f3cdf3f4c1868a1eed6dbe3.html
Copyright © 2011-2022 走看看