zoukankan      html  css  js  c++  java
  • StructuredStreaming(New)

    SparkStreaming API using DataSets and DataFrames  (New)

    使用流式DataSets和流式DataFrames的API

      ◆ 1.创建流式DataFrames和流式Datasets(重点)
      ◆ 2.流式DataFrames/Datasets的操作(重点)
      ◆ 3.启动流查询(重点)
      ◆ 4.管理流查询(了解)
      ◆ 5.监控流查询(了解)
      ◆ 6.使用检查点从故障中恢复(重点)

    1.创建流式DataFrames和流式Datasets(重点)

      ◆ 输入源(Input Source)   

        File Source
        Kafka Source
        Socket Source (测试)
        Rate Source (测试,实验性)

      ◆ 流式DataFrames/Datasets的结构类型推断与划分

    FileSource:

    ◆ 须知:从目录中读取文件来作为输入数据流。
    支持文件的格式有: text, csv, json, orc, parquet。
    ◆ 注意:支持glob路径,但不支持多个逗号分隔路径golbs。
    ◆ 属性:有五个option可以设置:
    ➢ path:输入目录的路径,对所有文件格式都是通用的
    ➢ maxFilesPerTrigger:在每个触发器中要考虑的新文件的最大数目(默认值:没有最大值)
    ➢ latestFirst:首先是否处理最新的新文件,当有大量的文件积压时是有用的(默认值:false)
    ➢ maxFileAge:默认值是7d 一周:如果latestFirst=true和maxFilesPerTrigger被设置,此配置不生效
    ➢ fileNameOnly:是否只基于文件名检查新文件而不是完整路径(默认值:false)
    将这个值设置为“true”时,下面的文件将被视为同一个文件,
    因为它们的文件名“dataset .txt”是相同的: “file:///dataset”
    “s3://a/dataset”
    “s3n://a/b/dataset”
    “s3a://a/b/c/dataset””
    ◆ 其他配置可以参照以下这个类:
    ➢ org.apache.spark.sql.execution.streaming.FileStreamOptions

    Kafka Source

    ◆ 须知:Kafka broker的版本需要是0.10.0或者更高版本。
    ◆ 要使用Kafka,项目的pom.xml需要引入Kafka的依赖
    ➢ <!-- spark-sql-kafka-0-10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
    </dependency>

    ◆ Options 必须设置:
    ➢ kafka.bootstrap.servers (指定kafka的访问地址host1:port1,host2:port2)
    ➢ subscribe/subscribepattern/assign(指定kafka中的主题)
    ➢ failondataloss(数据丢失报错)
    ➢ startingoffsets 读取数据的起始偏移量
    ➢ endingoffsets 读取数据的截止偏移量(在流式操作中此配置不生效)
    ◆ 其他配置可以参照以下这两个类:
    ➢ org.apache.kafka.clients.CommonClientConfigs
    ➢ org.apache.kafka.clients.consumer.ConsumerConfig

    SocketSource 

    ◆ 须知:从Socket连接中读取UTF8文本数据。在驱动器程序中监听服务网络端口。
    ◆ 注意:Socket Source只适用于测试,因为它不支持端到端的容错保证。
    ◆ 有三个option可以设置:
    ➢ host(必须)
    ➢ port(必须)
    ➢ includeTimestamp 默认值false 不生成时间戳日期
    ◆ 其他配置可以参照以下这个类:
    ➢ org.apache.spark.sql.execution.streaming.TextSocketSource

    RateSource

    ◆ 须知:只支持测试
    ◆ 注意: 只有在连续模式中支持的选项才是Nuffice分区和RayScript第二个。

     流式DataFrames/Datasets的结构类型推断与划分

      ◆ 默认情况下,基于文件源的结构化流要求必须指定schema,这种限制确保即
      使在失败的情况下也会使用一致的模式来进行流查询。
      ◆ 对于特殊用例,可以通过设置spark.sql.streaming.schemaInference = true。
      此时将会开启Spark自动类型推断功能。
      ◆ 注意:默认Spark sql中自动类型推断为启动状态。
      ◆ 当读取数据的目录中出现/key=value/ 的子目录时,Spark将自动递归这些子目
      录,产生分区发现。
      ◆ 如果用户提供的 schema 中出现了这些列, Spark将会根据正在读取的文件路
      径进行填充。
      ◆ 构成分区结构的目录必须在查询开始时是存在的,并且必须保持static 。
      ➢ 例如,当 /data/year=2015/ 存在时,可以添加 /data/year=2016/,但是更改
      分区列将无效的(即通过创建目录 /data/date=2016-04-17/ )。
      ◆ 注意:如果希望得到的数据可以按照/key=value/这种目录生成时,可以在输出
      数据时借助于partitionBy(“columnName”)

  • 相关阅读:
    cinder支持nfs快照
    浏览器输入URL到返回页面的全过程
    按需制作最小的本地yum源
    创建可执行bin安装文件
    RPCVersionCapError: Requested message version, 4.17 is incompatible. It needs to be equal in major version and less than or equal in minor version as the specified version cap 4.11.
    惠普IPMI登陆不上
    Linux进程状态——top,ps中看到进程状态D,S,Z的含义
    openstack-neutron基本的网络类型以及分析
    openstack octavia的实现与分析(二)原理,架构与基本流程
    flask上下文流程图
  • 原文地址:https://www.cnblogs.com/Diyo/p/11394265.html
Copyright © 2011-2022 走看看