zoukankan      html  css  js  c++  java
  • struct streaming中的监听器StreamingQueryListener

    在struct streaming提供了一个类,用来监听流的启动、停止、状态更新

    StreamingQueryListener


    实例化:StreamingQueryListener 后需要实现3个函数:

    abstract class StreamingQueryListener {
    
    import StreamingQueryListener._
    
    /**
    * Called when a query is started.
    * @note This is called synchronously with
    * [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
    * that is, `onQueryStart` will be called on all listeners before
    * `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
    * don't block this method as it will block your query.
    * @since 2.0.0
    */
    def onQueryStarted(event: QueryStartedEvent): Unit
    
    /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
    * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
    * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
    * is terminated when you are processing `QueryProgressEvent`.
    * @since 2.0.0
    */
    def onQueryProgress(event: QueryProgressEvent): Unit
    
    /**
    * Called when a query is stopped, with or without error.
    * @since 2.0.0
    */
    def onQueryTerminated(event: QueryTerminatedEvent): Unit
    }
    onQueryStarted:结构化流启动的时候异步回调
    onQueryProgress:查询过程中的状态发生更新时候的异步回调
    onQueryTerminated:查询结束实时的异步回调

    上面这些内容有什么作用?
    一般在流处理中添加任务告警时候能用到。比如在onQueryStarted中判断是不是有满足告警的条件 , 如果有的话,就发送邮件告警或者钉钉告警灯
    那么在告警信息中我们就可以根据其中的exception获取报错具体详情,然后一并发送到邮件中

    @InterfaceStability.Evolving
    class QueryTerminatedEvent private[sql](
    val id: UUID,
    val runId: UUID,
    val exception: Option[String]) extends Event

    最后,附上一个使用的小例子:

    /**
      * Created by angel
      */
    object Test {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder
          .appName("IQL")
          .master("local[4]")
          .enableHiveSupport()
          .getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
    
    
    
        // Save the code as demo-StreamingQueryManager.scala
        // Start it using spark-shell
        // $ ./bin/spark-shell -i demo-StreamingQueryManager.scala
    
        // Register a StreamingQueryListener to receive notifications about state changes of streaming queries
        import org.apache.spark.sql.streaming.StreamingQueryListener
        val myQueryListener = new StreamingQueryListener {
          import org.apache.spark.sql.streaming.StreamingQueryListener._
          def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
            println(s"Query ${event.id} terminated")
          }
    
          def onQueryStarted(event: QueryStartedEvent): Unit = {
            println(s"Query ${event.id} started")
          }
          def onQueryProgress(event: QueryProgressEvent): Unit = {
            println(s"Query ${event.progress.name} process")
          }
        }
        spark.streams.addListener(myQueryListener)
    
        import org.apache.spark.sql.streaming._
        import scala.concurrent.duration._
    
        // Start streaming queries
    
        // Start the first query
        val q4s = spark.readStream.
          format("rate").
          load.
          writeStream.
          format("console").
          trigger(Trigger.ProcessingTime(4.seconds)).
          option("truncate", false).
          start
    
        // Start another query that is slightly slower
        val q10s = spark.readStream.
          format("rate").
          load.
          writeStream.
          format("console").
          trigger(Trigger.ProcessingTime(10.seconds)).
          option("truncate", false).
          start
    
        // Both queries run concurrently
        // You should see different outputs in the console
        // q4s prints out 4 rows every batch and twice as often as q10s
        // q10s prints out 10 rows every batch
    
        /*
        -------------------------------------------
        Batch: 7
        -------------------------------------------
        +-----------------------+-----+
        |timestamp              |value|
        +-----------------------+-----+
        |2017-10-27 13:44:07.462|21   |
        |2017-10-27 13:44:08.462|22   |
        |2017-10-27 13:44:09.462|23   |
        |2017-10-27 13:44:10.462|24   |
        +-----------------------+-----+
    
        -------------------------------------------
        Batch: 8
        -------------------------------------------
        +-----------------------+-----+
        |timestamp              |value|
        +-----------------------+-----+
        |2017-10-27 13:44:11.462|25   |
        |2017-10-27 13:44:12.462|26   |
        |2017-10-27 13:44:13.462|27   |
        |2017-10-27 13:44:14.462|28   |
        +-----------------------+-----+
    
        -------------------------------------------
        Batch: 2
        -------------------------------------------
        +-----------------------+-----+
        |timestamp              |value|
        +-----------------------+-----+
        |2017-10-27 13:44:09.847|6    |
        |2017-10-27 13:44:10.847|7    |
        |2017-10-27 13:44:11.847|8    |
        |2017-10-27 13:44:12.847|9    |
        |2017-10-27 13:44:13.847|10   |
        |2017-10-27 13:44:14.847|11   |
        |2017-10-27 13:44:15.847|12   |
        |2017-10-27 13:44:16.847|13   |
        |2017-10-27 13:44:17.847|14   |
        |2017-10-27 13:44:18.847|15   |
        +-----------------------+-----+
        */
    
        // Stop q4s on a separate thread
        // as we're about to block the current thread awaiting query termination
        import java.util.concurrent.Executors
        import java.util.concurrent.TimeUnit.SECONDS
        def queryTerminator(query: StreamingQuery) = new Runnable {
          def run = {
            println(s"Stopping streaming query: ${query.id}")
            query.stop
          }
        }
        import java.util.concurrent.TimeUnit.SECONDS
        // Stop the first query after 10 seconds
        Executors.newSingleThreadScheduledExecutor.
          scheduleWithFixedDelay(queryTerminator(q4s), 10, 60 * 5, SECONDS)
        // Stop the other query after 20 seconds
        Executors.newSingleThreadScheduledExecutor.
          scheduleWithFixedDelay(queryTerminator(q10s), 20, 60 * 5, SECONDS)
    
        // Use StreamingQueryManager to wait for any query termination (either q1 or q2)
        // the current thread will block indefinitely until either streaming query has finished
        spark.streams.awaitAnyTermination
    
        // You are here only after either streaming query has finished
        // Executing spark.streams.awaitAnyTermination again would return immediately
    
        // You should have received the QueryTerminatedEvent for the query termination
    
        // reset the last terminated streaming query
        spark.streams.resetTerminated
    
        // You know at least one query has terminated
    
        // Wait for the other query to terminate
        spark.streams.awaitAnyTermination
    
        assert(spark.streams.active.isEmpty)
    
        println("The demo went all fine. Exiting...")
    
        // leave spark-shell
        System.exit(0)
      }
    }
    小例子



  • 相关阅读:
    static 续--
    [非原创]java 中static作用解析
    public/private/protected作用域
    三种排序方法(冒泡、选择、插入)
    SQLMAP自动注入(四)
    SQLMAP自动注入(三)—Injection,Detection,Techniques
    SQLMAP自动注入(二)-request,优化
    SQLMAP自动注入(一)
    SQL盲注
    SQL注入——猜测字段名称
  • 原文地址:https://www.cnblogs.com/niutao/p/10949817.html
Copyright © 2011-2022 走看看