zoukankan      html  css  js  c++  java
  • SparkListener监听使用方式及自定义的事件处理动作

    本文针对spark 2.0+版本

    概述

    spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。

    自定义示例代码

    import org.apache.spark.internal.Logging
    import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}
    
    /**
     * Created by silent on 2019/1/11.
     */
    class MySparkAppListener extends SparkListener with Logging {
    
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
        val appId = applicationStart.appId
        logInfo("***************************************************" + appId.get)
      }
    
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        logInfo("************************ app end time ************************ " + applicationEnd.time)
      }
    }

    主函数运行示例

    object Main extends App {
         val spark = SparkSession.builder()
                     .appName("main")
                     .master("local[2]")
                     .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener") 
                     .getOrCreate()
    
         //spark.sparkContext.addSparkListener(new MySparkAppListener) 
         spark.stop()
    }

    说明:

    自定义监听sparListener后的注册方式有两种:

    方法1:conf配置中指定

    //spark2.0以下
    val sparkConf=new SparkConf()
    sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")
    
    // spark2.0+
    val spark = SparkSession.builder()
                     .appName("main")
                     .master("local[2]")
                     .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
                     .getOrCreate()

    方法2:sparkContext 类中指定

    //spark2.0前
    val sc = new SparkContext(sparkConf)
    sc.addSparkListener(new MySparkAppListener)
    
    //spark2.0+
    spark.sparkContext.addSparkListener(new MySparkAppListener)

    sparkListerner 代码记录

    //SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。
    //想对哪个阶段的事件做一些自定义的动作,变继承SparkListener实现对应的函数即可
    
    abstract class SparkListener extends SparkListenerInterface {
      //阶段完成时触发的事件
      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
    
      //阶段提交时触发的事件
      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
    
      //任务启动时触发的事件
      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
    
      //下载任务结果的事件
      override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
    
      //任务结束的事件
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
    
      //job启动的事件
      override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
    
      //job结束的事件
      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
    
      //环境变量被更新的事件
      override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
    
      //块管理被添加的事件
      override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
    
      override def onBlockManagerRemoved(
          blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
    
      //取消rdd缓存的事件
      override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
    
      //app启动的事件
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
    
      //app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } 
    
      override def onExecutorMetricsUpdate(
          executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
    
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
    
      override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
    
      override def onExecutorBlacklisted(
          executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
    
      override def onExecutorUnblacklisted(
          executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
    
      override def onNodeBlacklisted(
          nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
    
      override def onNodeUnblacklisted(
          nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
    
      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
    
      override def onOtherEvent(event: SparkListenerEvent): Unit = { }
    }
  • 相关阅读:
    Openstack API 开发 快速入门
    virtualBox虚拟机到vmware虚拟机转换
    使用Blogilo 发布博客到cnblogs
    Openstack Troubleshooting
    hdoj 1051 Wooden Sticks(上升子序列个数问题)
    sdut 2430 pillars (dp)
    hdoj 1058 Humble Numbers(dp)
    uva 10815 Andy's First Dictionary(快排、字符串)
    sdut 2317 Homogeneous squares
    hdoj 1025 Constructing Roads In JGShining's Kingdom(最长上升子序列+二分)
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10253830.html
Copyright © 2011-2022 走看看