zoukankan      html  css  js  c++  java
  • SparkListener监听使用方式及自定义的事件处理动作

    本文针对spark 2.0+版本

    概述

    spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。

    自定义示例代码

    import org.apache.spark.internal.Logging
    import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}
    
    /**
     * Created by silent on 2019/1/11.
     */
    class MySparkAppListener extends SparkListener with Logging {
    
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
        val appId = applicationStart.appId
        logInfo("***************************************************" + appId.get)
      }
    
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        logInfo("************************ app end time ************************ " + applicationEnd.time)
      }
    }

    主函数运行示例

    object Main extends App {
         val spark = SparkSession.builder()
                     .appName("main")
                     .master("local[2]")
                     .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener") 
                     .getOrCreate()
    
         //spark.sparkContext.addSparkListener(new MySparkAppListener) 
         spark.stop()
    }

    说明:

    自定义监听sparListener后的注册方式有两种:

    方法1:conf配置中指定

    //spark2.0以下
    val sparkConf=new SparkConf()
    sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")
    
    // spark2.0+
    val spark = SparkSession.builder()
                     .appName("main")
                     .master("local[2]")
                     .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
                     .getOrCreate()

    方法2:sparkContext 类中指定

    //spark2.0前
    val sc = new SparkContext(sparkConf)
    sc.addSparkListener(new MySparkAppListener)
    
    //spark2.0+
    spark.sparkContext.addSparkListener(new MySparkAppListener)

    sparkListerner 代码记录

    //SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。
    //想对哪个阶段的事件做一些自定义的动作,变继承SparkListener实现对应的函数即可
    
    abstract class SparkListener extends SparkListenerInterface {
      //阶段完成时触发的事件
      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
    
      //阶段提交时触发的事件
      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
    
      //任务启动时触发的事件
      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
    
      //下载任务结果的事件
      override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
    
      //任务结束的事件
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
    
      //job启动的事件
      override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
    
      //job结束的事件
      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
    
      //环境变量被更新的事件
      override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
    
      //块管理被添加的事件
      override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
    
      override def onBlockManagerRemoved(
          blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
    
      //取消rdd缓存的事件
      override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
    
      //app启动的事件
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
    
      //app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } 
    
      override def onExecutorMetricsUpdate(
          executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
    
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
    
      override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
    
      override def onExecutorBlacklisted(
          executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
    
      override def onExecutorUnblacklisted(
          executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
    
      override def onNodeBlacklisted(
          nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
    
      override def onNodeUnblacklisted(
          nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
    
      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
    
      override def onOtherEvent(event: SparkListenerEvent): Unit = { }
    }
  • 相关阅读:
    [日常工作]WorkStation 使用端口转发的方式使用宿主机IP地址提供服务
    [日常工作]虚拟机或者实体机转换成HyperV虚拟机的方法
    [linux学习]sysctl 以及 net.ipv4.ip_forward
    [自学]Docker system 命令 查看docker镜像磁盘占用情况 Docker volume 相关
    Docker 修改默认存储路径的一个方法
    [学习笔记]Ubuntu下安装配置SQLSERVER2017
    VSCODE安装以及使用Python运行调试代码的简单记录
    Win2012r2 以及win2016 安装.NET3.5
    Win2016以及win10 IIS10 下安装IEwebcontrol的方法
    [日常工作]协助同事从不能开机的机器上面获取资料信息
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10253830.html
Copyright © 2011-2022 走看看