zoukankan      html  css  js  c++  java
  • Spark Streaming任务延迟监控及告警

    概述

    StreamingListener 是针对spark streaming的各个阶段的事件监听机制。

    StreamingListener接口

    //需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可
    //本身既有注释说明
    trait StreamingListener {
    
      /** Called when the streaming has been started */
      /** streaming 启动的事件 */
      def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
    
      /** Called when a receiver has been started */
      /** 接收启动事件 */
      def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
    
      /** Called when a receiver has reported an error */
      def onReceiverError(receiverError: StreamingListenerReceiverError) { }
    
      /** Called when a receiver has been stopped */
      def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
    
      /** Called when a batch of jobs has been submitted for processing. */
      /** 每个批次提交的事件 */
      def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
    
      /** Called when processing of a batch of jobs has started.  */
      /** 每个批次启动的事件 */
      def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
    
      /** Called when processing of a batch of jobs has completed. */
      /** 每个批次完成的事件  */
      def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
    
      /** Called when processing of a job of a batch has started. */
      def onOutputOperationStarted(
          outputOperationStarted: StreamingListenerOutputOperationStarted) { }
    
      /** Called when processing of a job of a batch has completed. */
      def onOutputOperationCompleted(
          outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
    }
    

    自定义StreamingListener

    功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

    class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{
    
      private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")
    
    //每个批次完成时执行
      override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
        val batchInfo = batchCompleted.batchInfo
        val processingStartTime = batchCompleted.batchInfo.processingStartTime
        val numRecords = batchCompleted.batchInfo.numRecords
        val processingEndTime = batchInfo.processingEndTime
        val processingDelay = batchInfo.processingDelay
        val totalDelay = batchInfo.totalDelay
    
        //将每次告警时间写入redis,用以判断告警间隔大于2分钟
        val jedis = RedisClusterClient.getJedisClusterClient()
        val current_time = (System.currentTimeMillis / 1000).toInt
        val redis_time = jedis.get(appName)
        var flag = false
        if(redis_time==null || current_time-redis_time.toInt>120){
          jedis.set(appName,current_time.toString)
          flag = true
        }
        
        //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警
        if(totalDelay.get >= times * duration * 1000 && flag){
          val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
          println(monitorContent)
          val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
          val getURL = "http://node1:8002/message/weixin?msg="+msg
          HttpClient.doGet(getURL)
        }
      }
    }
    

    应用

    //streamingListener不需要在配置中设置,可以直接添加到streamingContext中
    object My{
        def main(args : Array[String]) : Unit = {
            val sparkConf = new SparkConf()
            val ssc = new StreamingContext(sparkConf,Seconds(20))
            ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))
    
            ....
        }
    }
    

    订阅关注微信公众号《大数据技术进阶》,及时获取更多大数据架构和应用相关技术文章!

  • 相关阅读:
    build.gradle中引入jar
    SparkSQL 运行异常:org/codehaus/janino/InternalCompilerException 执行SparkSQL时出现异常:
    MapReduce wordcount 输入路径为目录 java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
    spark远程调试代码报错 StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.126.128:7077
    springboot集成spark
    springboot集成spark并使用sparksql
    SparkSession、SparkContext、SQLContext和HiveContext之间的区别。
    启动thriftserver.sh
    Hive学习系列:maven+springboot+CDH环境下,连接Hive进行操作
    【网络流】——P2756 飞行员配对方案问题
  • 原文地址:https://www.cnblogs.com/xiaodf/p/11776915.html
Copyright © 2011-2022 走看看