zoukankan      html  css  js  c++  java
  • Structured Streaming

    内容来自于林子雨老师的《Spark编程基础》和一些自己的学习笔记。

    概述

    Structured Streaming是一种基于Spark SQL引擎构建的、可扩展且容错性高的流处理引擎。这里我把它理解为,因为Spark不能处理毫秒级流计算而诞生的流处理引擎。因此Structured Streaming采用的数据抽象是DataFrame而不是DStream。

    设计了输入源、执行引擎、和接收器。通过偏移量来跟踪流的读取位置,同时引擎可以使用检查点和预写日志来记录数据的偏移范围,来达到一致性。

    关键思想

    Structed Streaming的关键思想是将试试数据流视为一张正在不断添加数据的表。流计算等同于在一张静态表上批处理查询,并进行增量查询。

    两种处理模型

    包括两个处理模型,微批处理模型和持续处理模型,默认是微批处理模型。

    微批处理模型,流计算引擎在处理上一批次数据结束后,再对新数据进行批量查询。在下一个微批处理之前,要将数据的偏移范围保存在日志中。所以,当前到达的数据需要在上一批次处理完,同时偏移范围记录到日志后,才能下一个批次数据继续处理,因此会有一定的延迟。书中表示延迟超过100毫秒。

    持续处理模型,可以实现毫秒级延迟。启动一系列的连续读取、处理和写入结果任务。对于偏移范围的记录异步写入日志,以达到连续处理,避免高延迟。但这是建立在牺牲一致性为代价上的,低延迟下,会丢失数据。

    编写Structured Streaming程序

    • 导入pyspark模块
    • 创建Spark Session对象
    • 创建输入数据源
    • 定义流计算过程
    • 启动流计算并输出结果

    输出操作

    DataFrame/Dataset的writeStrem()方法返回DataStreamWriter接口,接口启动流计算,并写入外部的输出接收器。不同的接收器存在不同的选项。

    示例代码

    quert = wordcount
            .writeStream
            .outputMod("update") //输入模式
            .format("console")
            .option() //配置
            .trigger() //触发间隔
            .start()
    

    输出模式

    输出模式有Append模式,Complete模式,Update模式。

    • Append模式,结果表中自上次触发间隔后增加的新行,才会被写入外部存储器
    • Complete模式,已更新的完整的结果表可被写入外部存储器
    • Update模式,只有自上次触发间隔后结果表中发生更新的行,才会被写入存储器

    输入源

    内置的输入源有File源、Kafka源、Socket源、Rate源。

    File源

    支持容错,参数有:

    • path,路径的输入目录
    • maxFilesPerTrigger,每个触发器中最大文件数量
    • large bcaklog of files,是否处理最新文件,默认否
    • fileNameOnly,是否根据文件名而不是完整路径来判断文件

    Socket源

    不支持容错,参数有:

    • host,主机
    • port,端口

    Rate源

    • rowsPerSecond,每秒生成多少行
    • numPartitions,生成行的分区号

    Kafka源

    • kafka.bootstrap.servers,服务器IP和地端口
    • topic,主题名称

    输出接收器

    内置的输出接收器有File接收器、Kafka接收器、Foreach接收器、Console接收器、Memory接收器。

    触发器

    定义了流数据处理的计时。共有四种选项:

    • 没有指定触发器设置,默认为查询以微批处理模式执行,前一个微批处理结束后,进行下一个微批处理
    • Trigger.ProcessingTime("n seconds),固定间隔时间开启微批处理,若上一个微批处理在间隔时间内结束,则等到时间间隔后再开启下一个,若上一个在时间内还没结束,则下一个在上一个完成后立即开始
    • Trigger.Once(),一次微批处理所有数据,用来节省成本,集群开启,微批处理一次,然后结束,关闭集群
    • Trigger.Continuous("1 second"),连续处理模式

    窗口操作

    可以使用groupBy()和window()操作来表示窗口聚合。

    val windowedCounts = words.groupBy(
      window($"timestamp", "10 minutes", "5 minutes"),
      $"word"
    ).count()
    

    水印

    水印的作用,不接收延迟过长的数据。引擎将维护后期数据更新状态直到阈值大于最大事件时间。超过阈值的数据将被删除。

    容错处理

    Spark通过将程序划分为输入源、执行引擎和接收器等多个层次来保障容错。

    输入源通过偏移量标记,执行引擎通过检查点保存中间结果,接收器使用幂等来保障输出的稳定性。

  • 相关阅读:
    2.变量
    1.注释
    MyEclipse使用教程:使用DevStyle增强型启动
    DevExpress WPF v19.1新版亮点:Data Editors等控件新功能
    测试工具Telerik Test Studio发布R2 2019|支持VS 2019
    MyEclipse使用教程:使用主题自定义工作台外观
    DevExpress WPF v19.1:Data Grid/Tree List等控件功能增强
    知名界面类控件Kendo UI for jQuery R2 2019 SP1发布|附下载
    MyEclipse使用教程:unattended安装
    跨平台开发框架DevExtreme v19.1.4正式发布|附下载
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12821884.html
Copyright © 2011-2022 走看看