zoukankan      html  css  js  c++  java
  • StructuredStreaming(New)

    SparkStreaming API using DataSets and DataFrames  (New)

    使用流式DataSets和流式DataFrames的API

      ◆ 1.创建流式DataFrames和流式Datasets(重点)
      ◆ 2.流式DataFrames/Datasets的操作(重点)
      ◆ 3.启动流查询(重点)
      ◆ 4.管理流查询(了解)
      ◆ 5.监控流查询(了解)
      ◆ 6.使用检查点从故障中恢复(重点)

    1.创建流式DataFrames和流式Datasets(重点)

      ◆ 输入源(Input Source)   

        File Source
        Kafka Source
        Socket Source (测试)
        Rate Source (测试,实验性)

      ◆ 流式DataFrames/Datasets的结构类型推断与划分

    FileSource:

    ◆ 须知:从目录中读取文件来作为输入数据流。
    支持文件的格式有: text, csv, json, orc, parquet。
    ◆ 注意:支持glob路径,但不支持多个逗号分隔路径golbs。
    ◆ 属性:有五个option可以设置:
    ➢ path:输入目录的路径,对所有文件格式都是通用的
    ➢ maxFilesPerTrigger:在每个触发器中要考虑的新文件的最大数目(默认值:没有最大值)
    ➢ latestFirst:首先是否处理最新的新文件,当有大量的文件积压时是有用的(默认值:false)
    ➢ maxFileAge:默认值是7d 一周:如果latestFirst=true和maxFilesPerTrigger被设置,此配置不生效
    ➢ fileNameOnly:是否只基于文件名检查新文件而不是完整路径(默认值:false)
    将这个值设置为“true”时,下面的文件将被视为同一个文件,
    因为它们的文件名“dataset .txt”是相同的: “file:///dataset”
    “s3://a/dataset”
    “s3n://a/b/dataset”
    “s3a://a/b/c/dataset””
    ◆ 其他配置可以参照以下这个类:
    ➢ org.apache.spark.sql.execution.streaming.FileStreamOptions

    Kafka Source

    ◆ 须知:Kafka broker的版本需要是0.10.0或者更高版本。
    ◆ 要使用Kafka,项目的pom.xml需要引入Kafka的依赖
    ➢ <!-- spark-sql-kafka-0-10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
    </dependency>

    ◆ Options 必须设置:
    ➢ kafka.bootstrap.servers (指定kafka的访问地址host1:port1,host2:port2)
    ➢ subscribe/subscribepattern/assign(指定kafka中的主题)
    ➢ failondataloss(数据丢失报错)
    ➢ startingoffsets 读取数据的起始偏移量
    ➢ endingoffsets 读取数据的截止偏移量(在流式操作中此配置不生效)
    ◆ 其他配置可以参照以下这两个类:
    ➢ org.apache.kafka.clients.CommonClientConfigs
    ➢ org.apache.kafka.clients.consumer.ConsumerConfig

    SocketSource 

    ◆ 须知:从Socket连接中读取UTF8文本数据。在驱动器程序中监听服务网络端口。
    ◆ 注意:Socket Source只适用于测试,因为它不支持端到端的容错保证。
    ◆ 有三个option可以设置:
    ➢ host(必须)
    ➢ port(必须)
    ➢ includeTimestamp 默认值false 不生成时间戳日期
    ◆ 其他配置可以参照以下这个类:
    ➢ org.apache.spark.sql.execution.streaming.TextSocketSource

    RateSource

    ◆ 须知:只支持测试
    ◆ 注意: 只有在连续模式中支持的选项才是Nuffice分区和RayScript第二个。

     流式DataFrames/Datasets的结构类型推断与划分

      ◆ 默认情况下,基于文件源的结构化流要求必须指定schema,这种限制确保即
      使在失败的情况下也会使用一致的模式来进行流查询。
      ◆ 对于特殊用例,可以通过设置spark.sql.streaming.schemaInference = true。
      此时将会开启Spark自动类型推断功能。
      ◆ 注意:默认Spark sql中自动类型推断为启动状态。
      ◆ 当读取数据的目录中出现/key=value/ 的子目录时,Spark将自动递归这些子目
      录,产生分区发现。
      ◆ 如果用户提供的 schema 中出现了这些列, Spark将会根据正在读取的文件路
      径进行填充。
      ◆ 构成分区结构的目录必须在查询开始时是存在的,并且必须保持static 。
      ➢ 例如,当 /data/year=2015/ 存在时,可以添加 /data/year=2016/,但是更改
      分区列将无效的(即通过创建目录 /data/date=2016-04-17/ )。
      ◆ 注意:如果希望得到的数据可以按照/key=value/这种目录生成时,可以在输出
      数据时借助于partitionBy(“columnName”)

  • 相关阅读:
    Ehcache缓存配置和基本使用
    微信公众号接入图灵机器人实现自动回复消息
    数据库索引的使用
    oracle中=>是什么意思
    [转载]AppSettings和ConnectionStrings的区别
    获得字符串长度
    HTML------8.调用接口 实现动态折现图
    HTML------7.编辑折现图
    JAVA------14.今日和昨日时间转换
    JAVA------13.字符串去掉非数字 ,从大到小排序
  • 原文地址:https://www.cnblogs.com/Diyo/p/11394265.html
Copyright © 2011-2022 走看看