zoukankan      html  css  js  c++  java
  • spark streaming流式计算---监听器

       随着对spark的了解,有时会觉得spark就像一个宝盒一样时不时会出现一些难以置信的新功能。每一个新功能被挖掘,就可以使开发过程变得更加便利一点。甚至使很多不可能完成或者完成起来比较复杂的操作,变成简单起来。有些功能是框架专门开放给用户使用,有些则是框架内部使用但是又对外暴露了接口,用户也可以使用的功能。

         今天和大家分享的是两个监听器SparkListener和streamingListener,由于这两个监听器的存在使得很多功能的开发变得轻松很多,也使很多技术实现变得轻便很多。

    结合我的使用经验,这两个监听器主要可以如下两种用法:

       1. 可以获取我们在sparkUI上的任何指标。当你想获取指标并且想做监控预警或者打算重构sparkUI,那你不需要再通过爬虫解析复杂的网页代码就可以获取sparkUI上的各种指标。

       2. 对spark任务的各种事件做相应的操作,嵌入回调代码。

           比如:你可以在sparkListener中的onApplicationStart方法中做driver端的第三方框架的连接池初始化(连接仅限driver端使用)以及其他变量的初始化,并放置到公共对象中,driver端直接就可以使用。且在onApplicaionComple方法中做连接的释放工作,以及变量的收集持久化操作,以次达到隐藏变量初始化的操作,冰做成公共jar包供其它人使用。

          又如:你可以在StreamingListener的onbatchStart操作中获取kafka读取的offset位置以及读取数据条数,在onBatchCompl方法中将这些offset信息保存到mysql/zk中,达到优雅隐藏容错代码的目的。同样可以做成公共jar共其他项目使用。

           等等这些都是一些非常酷炫的操作,当然还会有其他的酷炫的使用场景还在等待着你去挖掘。

    性能分析

           在使用过程中,大家可能比较关系另外一个问题:指标收集,会对流式计算性能产生多大的影响?

           答案就是,在指标收集这一块,对于流式计算或者spark core产生的影响会很小。因为即使你不收集SparkUI也会收集,这些指标一样会生成。只是对于driver端的开销会稍微变大,如果在流式计算场景可能需要你调大driver端的cpu和内存
    一 .自定义Listener 并注册(伪代码如下):

       val spark:SparkSession=null
        val ssc:StreamingContext=null
    /*注册streamingListnener*/ ssc.addStreamingListener(new MyStreamingListener) /*注册sparkListener*/ spark.sparkContext.addSparkListener(new MySparkListener)
    /*自定义streamingListener*/ class MyStreamingListener extends StreamingListener{ //TODO 重载内置方法 }
      /*自定义SparkListnener*/

      class MySparkListener extends SparkListener { //TODO 重载内置方法 }

    二.SparkListener内置方法讲解

        通过如下重写的方法可以发现,你可以通过这个SparkListener对App,job,stage,task,executor,blockManager等产生的指标进行实时的收集,并在这些事件触发时嵌入一些代码,可以达到当某些指标达到警戒阈值触发自定义的一些规则。如下已经列出了获取的spark级别指标和事件回调函数,下一节会列出streaming的性能监控点以及收集的指标。

    class MySparkListener extends SparkListener {
      /*当整个应用开始执行时*/
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit={
        /*获取appID-->spark on yarn模式下的appID一致*/
        applicationStart.appId
        /*appName*/
        applicationStart.appName
        /*driver端的日志,如果配置了日志的截断机制,获取的将不是完整日志*/
        applicationStart.driverLogs
        /*提交的用户*/
        applicationStart.sparkUser
        /*开始的事件*/
        applicationStart.time
      }
      /*当整个Application结束时调用的回调函数*/
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        /*结束的时间*/
        applicationEnd.time
      }
      /*当job开始执行时触发的回调函数*/
      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
        /*jobId和SParkUI上的一致*/
        jobStart.jobId
        /*配置信息*/
        jobStart.properties
        /*当前job根据宽窄依赖生成的所有strageID*/
        jobStart.stageIds
        /*job开始的时间*/
        jobStart.time
        jobStart.properties
        /*当前job每一个stage的信息抽象*/
        jobStart.stageInfos.foreach(stageInfo => {
          stageInfo.stageId
          /*stage提交的时间,不是task开始执行的时间,这个时间是stage开始抽象成taskDesc的开始时间*/
          stageInfo.submissionTime
          /*这个stage完成的时间*/
          stageInfo.completionTime
          /*当前stage发成了错误会重试,重试会在stageID后加上“_重试次数”*/
          stageInfo.attemptId
          /*当前staget的详细信息*/
          stageInfo.details
          /*当前stage累加器的中间结果*/
          stageInfo.accumulables
          /*如果当前stage失败,返回失败原因,如果做日志预警,可以在此处判断非空并嵌入代码收集错误日志*/
          stageInfo.failureReason
          stageInfo.name
          /*当前stage抽象出的taskSet的长度*/
          stageInfo.numTasks
          /*父依赖stage的id*/
          stageInfo.parentIds
          stageInfo.rddInfos
          /*task指标收集*/
          stageInfo.taskMetrics
          stageInfo.taskLocalityPreferences
          stageInfo.stageFailed("")
        })
      }
      /*当job结束时触发的回调函数*/
      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
        jobEnd.jobId
        jobEnd.time
        jobEnd.jobResult
      }
      /*当提交stage时触发的回调函数*/
      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
        stageSubmitted.properties
        stageSubmitted.stageInfo.taskLocalityPreferences
        stageSubmitted.stageInfo.stageFailed("")
        stageSubmitted.stageInfo.attemptId 
        stageSubmitted.stageInfo.taskMetrics.executorDeserializeTime
        stageSubmitted.stageInfo.taskMetrics.executorDeserializeCpuTime
        stageSubmitted.stageInfo.taskMetrics.executorCpuTime
        stageSubmitted.stageInfo.taskMetrics.diskBytesSpilled
        stageSubmitted.stageInfo.taskMetrics.inputMetrics.recordsRead
        stageSubmitted.stageInfo.taskMetrics.inputMetrics.bytesRead
        stageSubmitted.stageInfo.taskMetrics.outputMetrics.recordsWritten
        stageSubmitted.stageInfo.taskMetrics.outputMetrics.bytesWritten
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.totalBytesRead
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.recordsRead
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.fetchWaitTime
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBlocksFetched
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBytesRead
        stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.remoteBlocksFetched
        stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.bytesWritten
        stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.recordsWritten
        stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.writeTime
        stageSubmitted.stageInfo.taskMetrics.executorRunTime
        stageSubmitted.stageInfo.taskMetrics.jvmGCTime
        stageSubmitted.stageInfo.taskMetrics.memoryBytesSpilled
        stageSubmitted.stageInfo.taskMetrics.peakExecutionMemory
        stageSubmitted.stageInfo.taskMetrics.resultSerializationTime
        stageSubmitted.stageInfo.taskMetrics.resultSize
        stageSubmitted.stageInfo.taskMetrics.updatedBlockStatuses
        stageSubmitted.stageInfo.rddInfos
        stageSubmitted.stageInfo.parentIds
        stageSubmitted.stageInfo.details
        stageSubmitted.stageInfo.numTasks
        stageSubmitted.stageInfo.name
        stageSubmitted.stageInfo.accumulables
        stageSubmitted.stageInfo.completionTime
        stageSubmitted.stageInfo.submissionTime
        stageSubmitted.stageInfo.stageId
        stageSubmitted.stageInfo.failureReason
     
      }
      /*当stage完成时触发的回调函数*/
      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
        stageCompleted.stageInfo.attemptId
        stageCompleted.stageInfo.failureReason
        stageCompleted.stageInfo.stageId
        stageCompleted.stageInfo.submissionTime
        stageCompleted.stageInfo.completionTime
        stageCompleted.stageInfo.accumulables
        stageCompleted.stageInfo.details
        stageCompleted.stageInfo.name
        stageCompleted.stageInfo.numTasks
        stageCompleted.stageInfo.parentIds
        stageCompleted.stageInfo.rddInfos
        stageCompleted.stageInfo.taskMetrics
        stageCompleted.stageInfo.stageFailed()
        stageCompleted.stageInfo.taskLocalityPreferences
      }
      /*当task开始时触发的回调函数*/
      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
        taskStart.stageAttemptId
        taskStart.stageId
        taskStart.taskInfo.executorId
        taskStart.taskInfo.taskId
        taskStart.taskInfo.finishTime
        taskStart.taskInfo.launchTime
        taskStart.taskInfo.accumulables
        taskStart.taskInfo.attemptNumber
        taskStart.taskInfo.failed
        taskStart.taskInfo.gettingResultTime
        taskStart.taskInfo.gettingResult
        taskStart.taskInfo.executorId
        taskStart.taskInfo.host
        taskStart.taskInfo.index
        taskStart.taskInfo.killed
        taskStart.taskInfo.speculative
        taskStart.taskInfo.taskLocality
        taskStart.taskInfo.duration
        taskStart.taskInfo.finished
        taskStart.taskInfo.id
        taskStart.taskInfo.running
        taskStart.taskInfo.successful
        taskStart.taskInfo.status
      }
      /*获取task执行的结果*/
      override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = {
        taskGettingResult.taskInfo
      }
      /*当task执行完成时执行的回调函数*/
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        taskEnd.taskMetrics.resultSize
        taskEnd.taskMetrics.updatedBlockStatuses
        taskEnd.taskMetrics.resultSerializationTime
        taskEnd.taskMetrics.peakExecutionMemory
        taskEnd.taskMetrics.memoryBytesSpilled
        taskEnd.taskMetrics.jvmGCTime
        taskEnd.taskMetrics.executorRunTime
        taskEnd.taskMetrics.shuffleWriteMetrics
        taskEnd.taskMetrics.shuffleReadMetrics
        taskEnd.taskMetrics.outputMetrics
        taskEnd.taskMetrics.inputMetrics
        taskEnd.taskMetrics.diskBytesSpilled
        taskEnd.taskMetrics.executorCpuTime
        taskEnd.taskMetrics.executorDeserializeCpuTime
        taskEnd.taskMetrics.executorDeserializeTime
        taskEnd.taskInfo.executorId
        taskEnd.taskInfo.host
        taskEnd.taskInfo.index
        taskEnd.taskInfo.killed
        taskEnd.taskInfo.speculative
        taskEnd.taskInfo.taskLocality
        taskEnd.taskInfo.duration
        taskEnd.taskInfo.finished
        taskEnd.taskInfo.taskId
        taskEnd.taskInfo.id
        taskEnd.taskInfo.running
        taskEnd.stageId
        taskEnd.reason
        taskEnd.stageAttemptId
        taskEnd.taskType
      }
      /*新增bockManger时触发的回调函数*/
      override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
        blockManagerAdded.blockManagerId.executorId
        blockManagerAdded.blockManagerId.host
        blockManagerAdded.blockManagerId.port
        blockManagerAdded.blockManagerId.topologyInfo
        blockManagerAdded.blockManagerId.hostPort
        blockManagerAdded.blockManagerId.isDriver
        blockManagerAdded.maxMem
        blockManagerAdded.time
      }
      /*当blockManage中管理的内存或者磁盘发生变化时触发的回调函数*/
      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
        blockUpdated.blockUpdatedInfo.blockId
        blockUpdated.blockUpdatedInfo.blockManagerId
        blockUpdated.blockUpdatedInfo.diskSize
        blockUpdated.blockUpdatedInfo.memSize
        blockUpdated.blockUpdatedInfo.storageLevel
      }
       /*当blockManager回收时触发的回调函数*/
      override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
        blockManagerRemoved.blockManagerId
        blockManagerRemoved.time
      }
      /*当 上下文环境发生变化是触发的回调函数*/
      override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
        environmentUpdate.environmentDetails
      }
      /*当RDD发生unpersist时发生的回调函数*/
      override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = {
        unpersistRDD.rddId
      }
     
      override def onOtherEvent(event: SparkListenerEvent): Unit = {
      }
      /*当新增一个executor时触发的回调函数*/
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
        executorAdded.executorId
        executorAdded.executorInfo.executorHost
        executorAdded.executorInfo.logUrlMap
        executorAdded.executorInfo.totalCores
        executorAdded.time
      }
     /*当executor发生变化时触发的回调函数*/
      override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
        executorMetricsUpdate.accumUpdates
        executorMetricsUpdate.execId
      }
     /*当移除一个executor时触发的回调函数*/
      override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
        executorRemoved.executorId
        executorRemoved.reason
        executorRemoved.time
      }
    }

    三.StreamingListener内置方法讲解

    class MyStreamingListener extends StreamingListener {
      /*流式计算开始时,触发的回调函数*/
      override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {
        streamingStarted.time
      }
      /*当前batch提交时触发的回调函数*/
      override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
        batchSubmitted.batchInfo.streamIdToInputInfo.foreach(tuple=>{
          val streamInputInfo = tuple._2
          streamInputInfo.metadata
          streamInputInfo.numRecords
          streamInputInfo.inputStreamId
          streamInputInfo.metadataDescription
        })
        batchSubmitted.batchInfo.numRecords
        batchSubmitted.batchInfo.outputOperationInfos
        batchSubmitted.batchInfo.submissionTime
        batchSubmitted.batchInfo.batchTime
        batchSubmitted.batchInfo.processingEndTime
        batchSubmitted.batchInfo.processingStartTime
        batchSubmitted.batchInfo.processingDelay
        batchSubmitted.batchInfo.schedulingDelay
        batchSubmitted.batchInfo.totalDelay
      }
      /*当前batch开始执行时触发的回调函数*/
      override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
        batchStarted.batchInfo.totalDelay
        batchStarted.batchInfo.schedulingDelay
        batchStarted.batchInfo.processingDelay
        batchStarted.batchInfo.processingStartTime
        batchStarted.batchInfo.processingEndTime
        batchStarted.batchInfo.batchTime
        batchStarted.batchInfo.submissionTime
        batchStarted.batchInfo.outputOperationInfos
        batchStarted.batchInfo.numRecords
        batchStarted.batchInfo.streamIdToInputInfo
      }
      /*当前batch完成时触发的回调函数*/
      override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
        batchCompleted.batchInfo.streamIdToInputInfo
        batchCompleted.batchInfo.numRecords
        batchCompleted.batchInfo.outputOperationInfos
        batchCompleted.batchInfo.submissionTime
        batchCompleted.batchInfo.batchTime
        batchCompleted.batchInfo.processingEndTime
        batchCompleted.batchInfo.processingStartTime
        batchCompleted.batchInfo.processingDelay
        batchCompleted.batchInfo.schedulingDelay
        batchCompleted.batchInfo.totalDelay
        /*获取offset,并持久化到第三方容器*/
        batchCompleted.batchInfo.streamIdToInputInfo.foreach(tuple=>{
          val offsets = tuple._2.metadata.get("offsets").get
          classOf[List[OffsetRange]].cast(offsets).foreach(offsetRange => {
            val partition = offsetRange.partition
            val minOffset = offsetRange.fromOffset
            val maxOffset = offsetRange.untilOffset
            val topicName = offsetRange.topic
            //TODO 将kafka容错信息,写到mysql/redis/zk等框架达到数据容错
        } )
      }
      /*当接收器启动时触发的回调函数(非directStreaming)*/
      override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
        receiverStarted.receiverInfo.executorId
        receiverStarted.receiverInfo.active
        receiverStarted.receiverInfo.lastError
        receiverStarted.receiverInfo.lastErrorMessage
        receiverStarted.receiverInfo.location
        receiverStarted.receiverInfo.name
        receiverStarted.receiverInfo.streamId
        receiverStarted.receiverInfo.lastErrorTime
      }
      /*当接收器结束时触发的回调函数(非directStreaming)*/
      override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
        receiverStopped.receiverInfo.lastErrorTime
        receiverStopped.receiverInfo.lastError
        receiverStopped.receiverInfo.streamId
        receiverStopped.receiverInfo.name
        receiverStopped.receiverInfo.location
        receiverStopped.receiverInfo.lastErrorMessage
        receiverStopped.receiverInfo.active
        receiverStopped.receiverInfo.executorId
      }
      /*当接收器发生错误时触发的回调函数(非directStreaming)*/
      override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
        receiverError.receiverInfo.executorId
        receiverError.receiverInfo.active
        receiverError.receiverInfo.lastErrorMessage
        receiverError.receiverInfo.lastError
        receiverError.receiverInfo.location
        receiverError.receiverInfo.name
        receiverError.receiverInfo.streamId
        receiverError.receiverInfo.lastErrorTime
      }
      /*当output开始时触发的回调函数*/
      override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
        outputOperationStarted.outputOperationInfo.description
        outputOperationStarted.outputOperationInfo.batchTime
        outputOperationStarted.outputOperationInfo.endTime
        outputOperationStarted.outputOperationInfo.failureReason
        outputOperationStarted.outputOperationInfo.id
        outputOperationStarted.outputOperationInfo.name
        outputOperationStarted.outputOperationInfo.startTime
        outputOperationStarted.outputOperationInfo.duration
      }
      /*当output结束时触发的回调函数*/
      override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
        outputOperationCompleted.outputOperationInfo.duration
        outputOperationCompleted.outputOperationInfo.startTime
        outputOperationCompleted.outputOperationInfo.name
        outputOperationCompleted.outputOperationInfo.id
        outputOperationCompleted.outputOperationInfo.failureReason
        outputOperationCompleted.outputOperationInfo.endTime
        outputOperationCompleted.outputOperationInfo.batchTime
        outputOperationCompleted.outputOperationInfo.description
      }
    }
  • 相关阅读:
    中等疾病活动度的RA患者持续传统治疗的结果:来自ERAN的数据
    中信国健临床通讯2011年1月第1期目录
    影像学是否应该纳入RA缓解标准?传统评分与修订后复合评分和影像学评估的比较
    RA患者妊娠期使用依那西普维持缓解
    TNFα拮抗剂减少脊髓损伤大鼠神经元和少突胶质细胞的凋亡
    新近起病的活动性RA患者中达标治疗与常规治疗的疗效比较:来自GUEPARD试验和ESPOIR队列的数据
    TNF抑制剂相关的肿瘤风险:阿达木单抗、依那西普和英夫利昔单抗随机对照试验的荟萃分析
    依那西普治疗日本RA患者的安全性与疗效的上市后监察
    依那西普与柳氮磺吡啶治疗强直性脊柱炎的临床疗效与安全性比较:一项随机双盲研究(ASCEND试验)
    POJ3450 Corporate Identity KMP+枚举
  • 原文地址:https://www.cnblogs.com/itboys/p/11111486.html
Copyright © 2011-2022 走看看