zoukankan      html  css  js  c++  java
  • [Spark]-结构化流之监控&故障恢复篇(待重修)

    6 流的监控以及故障恢复

      6.1.流的运行时数据

         结构化流启动后返回的 StreamingQuery 对象.    

        val query = df.writeStream.format("console").start()   // get the query object
    
        query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data
    
        query.runId       // get the unique id of this run of the query, which will be generated at every start/restart
    
        query.name        // get the name of the auto-generated or user-specified name
    
        query.explain()   // print detailed explanations of the query
    
        query.stop()      // stop the query
    
        query.awaitTermination()   // block until query is terminated, with stop() or with error
    
        query.exception       // the exception if the query has been terminated with error
    
        query.recentProgress  // an array of the most recent progress updates for this query
    
        query.lastProgress    // the most recent progress update of this streaming query

      6.2 交互式(同步)监控

        可以直接获取活动查询的当前状态和指标使用 streamingQuery.lastProgress() 和 streamingQuery.status()

        lastProgress() 返回一个 StreamingQueryProgress 对象  它有流的最后一个触发器中取得的progress的全部信息 - 处理了哪些数据,处理率是多少,延迟等等

        streamingQuery.recentProgress 返回最后几个进度的 array

        streamingQuery.status() 返回一个 StreamingQueryStatus 对象 提供有关的信息立即执行的查询 - 触发器是否 active ,数据是否正在处理等

      6.3 异步监控

        在sparkSession上附加一个 StreamingQueryListener.

        一旦你使用 sparkSession.streams.attachListener() 附加你的自定义 StreamingQueryListener 对象,当您启动查询和当有活动查询有进度时停止时,您将收到 callbacks (回调)

      6.4 故障恢复

        如果发生 failure or intentional shutdown (故障或故意关机),您可以恢复之前的查询的进度和状态,并继续停止的位置.(通过 checkpointing and write ahead logs (检查点和预写入日志)完成)

        通过配置 checkpoint location (检查点位置)查询,将保存所有进度信息(即,每个触发器中处理的偏移范围)和正在运行的 aggregates (聚合) 

        aggDF
          .writeStream
          .outputMode("complete")
          .option("checkpointLocation", "path/to/HDFS/dir")
          .format("memory")
          .start()

      

  • 相关阅读:
    在使用EF开发时候,遇到 using 语句中使用的类型必须可隐式转换为“System.IDisposable“ 这个问题。
    The OAuth 2.0 Authorization Framework-摘自https://tools.ietf.org/html/rfc6749
    Principles of good RESTful API Design 好的 RESTful API 设计
    JQuery发送Put、Delete请求
    一起学习 微服务(MicroServices)-笔记
    [WinForm] 使用 WebBrowser 操作 HTML 頁面的 Element-摘自网络
    关闭HTML5只能提示(form上新增novalidate)
    MVC5
    理解OAuth 2.0 -摘自网络
    Java三大主流开源工作流引擎技术分析
  • 原文地址:https://www.cnblogs.com/NightPxy/p/9278892.html
Copyright © 2011-2022 走看看