zoukankan      html  css  js  c++  java
  • Spark学习:ShutdownHookManager虚拟机关闭钩子管理器

      Java程序经常也会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码。

      JAVA中的ShutdownHook提供了比较好的方案。

      JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以注册一个JVM关闭的钩子,这个钩子可以在一下几种场景中被调用:

    1. 程序正常退出
    2. 使用System.exit()
    3. 终端使用Ctrl+C触发的中断
    4. 系统关闭
    5. OutOfMemory宕机
    6. 使用Kill pid命令干掉进程(注:在使用kill -9 pid时,是不会被调用的)

    下面是JDK1.7中关于钩子的定义:

       public void addShutdownHook(Thread hook)
    参数:
        hook - An initialized but unstarted Thread object 
    抛出: 
        IllegalArgumentException - If the specified hook has already been registered, or if it can be determined that the hook is already running or has already been run 
        IllegalStateException - If the virtual machine is already in the process of shutting down 
        SecurityException - If a security manager is present and it denies RuntimePermission("shutdownHooks")
    从以下版本开始: 
        1.3
    另请参见:
        removeShutdownHook(java.lang.Thread), halt(int), exit(int)

    首先来测试第一种,程序正常退出的情况:

    package com.hook;  
    
    import java.util.concurrent.TimeUnit;  
    
    public class HookTest  
    {  
        public void start()  
        {  
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  
                @Override 
                public void run()  
                {  
                    System.out.println("Execute Hook.....");  
                }  
            }));  
        }  
    
        public static void main(String[] args)  
        {  
            new HookTest().start();  
            System.out.println("The Application is doing something");  
    
            try 
            {  
                TimeUnit.MILLISECONDS.sleep(5000);  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
    }

    运行结果:

    The Application is doing something  
    Execute Hook.....

    如上可以看到,当main线程运行结束之后就会调用关闭钩子。

    下面再来测试第五种情况(顺序有点乱,表在意这些细节):

    package com.hook;  
    
    import java.util.concurrent.TimeUnit;  
    
    public class HookTest2  
    {  
        public void start()  
        {  
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  
                @Override 
                public void run()  
                {  
                    System.out.println("Execute Hook.....");  
                }  
            }));  
        }  
    
        public static void main(String[] args)  
        {  
            new HookTest().start();  
            System.out.println("The Application is doing something");  
            byte[] b = new byte[500*1024*1024];  
            try 
            {  
                TimeUnit.MILLISECONDS.sleep(5000);  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
    
    }

    运行参数设置为:-Xmx20M 这样可以保证会有OutOfMemoryError的发生。

    运行结果:

    The Application is doing something  
    Exception in thread "main" java.lang.OutOfMemoryError: Java heap space  
        at com.hook.HookTest2.main(HookTest2.java:22)  
    Execute Hook.....

    可以看到程序遇到内存溢出错误后调用关闭钩子,与第一种情况中,程序等待5000ms运行结束之后推出调用关闭钩子不同。

    接下来再来测试第三种情况:

    package com.hook;  
    
    import java.util.concurrent.TimeUnit;  
    
    public class HookTest3  
    {  
        public void start()  
        {  
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  
                @Override 
                public void run()  
                {  
                    System.out.println("Execute Hook.....");  
                }  
            }));  
        }  
    
        public static void main(String[] args)  
        {  
            new HookTest3().start();  
            Thread thread = new Thread(new Runnable(){  
    
                @Override 
                public void run()  
                {  
                    while(true)  
                    {  
                        System.out.println("thread is running....");  
                        try 
                        {  
                            TimeUnit.MILLISECONDS.sleep(100);  
                        }  
                        catch (InterruptedException e)  
                        {  
                            e.printStackTrace();  
                        }  
                    }  
                }  
    
            });  
            thread.start();  
        }  
    
    }

    在命令行中编译:javac com/hook/HookTest3.java

    在命令行中运行:Java com.hook.HookTest3 (之后按下Ctrl+C)

    运行结果:

    上面是java的,下面来看看spark的ShutdownHookManager

    ShutdownHookManager的创建是在SparkContext中,为了在Spark程序挂掉的时候,处理一些清理工作

    /** ShutdownHookManager的创建,为了在Spark程序挂掉的时候,处理一些清理工作  */
        _shutdownHookRef = ShutdownHookManager.addShutdownHook(
          ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
          logInfo("Invoking stop() from shutdown hook")
          // 这调用停止方法。关闭SparkContext,我就搞不懂了
          stop()
        }

    来看看整体代码

    package org.apache.spark.util
    
    import java.io.File
    import java.util.PriorityQueue
    
    import scala.util.Try
    
    import org.apache.hadoop.fs.FileSystem
    
    import org.apache.spark.internal.Logging
    
    /**
     * Various utility methods used by Spark.
      *
      * Spark使用的各种实用方法。
     */
    private[spark] object ShutdownHookManager extends Logging {
      val DEFAULT_SHUTDOWN_PRIORITY = 100   // 默认的ShutdownHookManager优先级
    
      /**
       * The shutdown priority of the SparkContext instance. This is lower than the default
       * priority, so that by default hooks are run before the context is shut down.
        *
        * SparkContext实例的shutdown优先级。这比默认的优先级要低,因此在默认情况下,在关闭上下文之前运行默认的hooks。
       */
      val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
    
      /**
       * The shutdown priority of temp directory must be lower than the SparkContext shutdown
       * priority. Otherwise cleaning the temp directories while Spark jobs are running can
       * throw undesirable errors at the time of shutdown.
        *
        * temp目录的关闭优先级必须低于SparkContext关闭的优先级。否则,当Spark作业正在运行时,清理temp目录将会在关闭时抛出错误的错误。
       */
      val TEMP_DIR_SHUTDOWN_PRIORITY = 25
    
      // 懒加载
      private lazy val shutdownHooks = {
        val manager = new SparkShutdownHookManager()
        // 运行所有的hook,并且添加进去
        manager.install()
        manager
      }
    
      private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
    
      // Add a shutdown hook to delete the temp dirs when the JVM exits
      // 当JVM退出时,添加一个关闭钩子来删除temp dirs
      logDebug("Adding shutdown hook") // force eager creation of logger
    
    
      addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
        logInfo("Shutdown hook called")
        // we need to materialize the paths to delete because deleteRecursively removes items from
        // shutdownDeletePaths as we are traversing through it.
        shutdownDeletePaths.toArray.foreach { dirPath =>
          try {
            logInfo("Deleting directory " + dirPath)
            // 递归地删除文件或目录及其内容。 如果删除失败,则抛出异常。
            Utils.deleteRecursively(new File(dirPath))
          } catch {
            case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
          }
        }
      }
    
      // Register the path to be deleted via shutdown hook
      // 通过关闭hook注册要删除的路径
      def registerShutdownDeleteDir(file: File) {
        // 得到文件的绝对路径
        val absolutePath = file.getAbsolutePath()
    
        // 假如到要删除文件路径的集合
        shutdownDeletePaths.synchronized {
          shutdownDeletePaths += absolutePath
        }
      }
    
      // Remove the path to be deleted via shutdown hook 删除通过关闭hook删除的路径
      def removeShutdownDeleteDir(file: File) {
        val absolutePath = file.getAbsolutePath()
        // 删除文件
        shutdownDeletePaths.synchronized {
          shutdownDeletePaths.remove(absolutePath)
        }
      }
    
      // Is the path already registered to be deleted via a shutdown hook ?
      // 已经注册的路径是否通过关闭hook被删除?
      // 判断shutdownDeletePaths中是否包含给定的路径,如果包含返回true,否则返回false
      def hasShutdownDeleteDir(file: File): Boolean = {
        val absolutePath = file.getAbsolutePath()
        shutdownDeletePaths.synchronized {
          shutdownDeletePaths.contains(absolutePath)
        }
      }
    
      // Note: if file is child of some registered path, while not equal to it, then return true;
      // else false. This is to ensure that two shutdown hooks do not try to delete each others
      // paths - resulting in IOException and incomplete cleanup.
      // 注意:如果文件是某个已注册路径的子元素,而不等于它,则返回true;其他错误的。
      // 这是为了确保两个关闭hooks不会试图删除彼此的路径——导致IOException和不完整的清理。
      def hasRootAsShutdownDeleteDir(file: File): Boolean = {
        val absolutePath = file.getAbsolutePath()
        val retval = shutdownDeletePaths.synchronized {
          shutdownDeletePaths.exists { path =>
            !absolutePath.equals(path) && absolutePath.startsWith(path)
          }
        }
        if (retval) {
          logInfo("path = " + file + ", already present as root for deletion.")
        }
        retval
      }
    
      /**
       * Detect whether this thread might be executing a shutdown hook. Will always return true if
       * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
       * if System.exit was just called by a concurrent thread).
        *
        * 检测此线程是否正在执行关闭hook。如果当前线程是一个正在运行的关闭hook,但可能会错误地返回true(例如,如果系统),
        * 则将始终返回true。退出是由一个并发线程调用的。
       *
       * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
       * an IllegalStateException.
        *
        * 当前,这检测到JVM是否在Runtime#addShutdownHook,抛出了一个IllegalStateException异常。
       */
      def inShutdown(): Boolean = {
        try {
          val hook = new Thread {
            override def run() {}
          }
    
          // 这一点先加入后移除 是什么意思啊?
          // scalastyle:off runtimeaddshutdownhook
          Runtime.getRuntime.addShutdownHook(hook)
          // scalastyle:on runtimeaddshutdownhook
          Runtime.getRuntime.removeShutdownHook(hook)
        } catch {
          case ise: IllegalStateException => return true
        }
        false
      }
    
      /**
       * Adds a shutdown hook with default priority. 添加默认优先级的 shutdown hook。
       *
       * @param hook The code to run during shutdown.
       * @return A handle that can be used to unregister the shutdown hook.
       */
      def addShutdownHook(hook: () => Unit): AnyRef = {
        addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
      }
    
      /**
       * Adds a shutdown hook with the given priority. Hooks with lower priority values run
       * first.
        *
        * 根据一个指定的优先级添加一个shutdown hook,优先级低的Hooks优先被运行
       *
       * @param hook The code to run during shutdown.
       * @return A handle that can be used to unregister the shutdown hook.
       */
      def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
        shutdownHooks.add(priority, hook)
      }
    
      /**
       * Remove a previously installed shutdown hook. 删除先前安装的shutdown hook
       *
       * @param ref A handle returned by `addShutdownHook`.
       * @return Whether the hook was removed.
       */
      def removeShutdownHook(ref: AnyRef): Boolean = {
        shutdownHooks.remove(ref)
      }
    
    }
    
    private [util] class SparkShutdownHookManager {
    
      // 权限队列
      private val hooks = new PriorityQueue[SparkShutdownHook]()
      @volatile private var shuttingDown = false
    
      /**
       * Install a hook to run at shutdown and run all registered hooks in order.
       * 安装一个hook来运行关闭,并运行所有已注册的hooks。
       */
      def install(): Unit = {
        val hookTask = new Runnable() {
          override def run(): Unit = runAll()
        }
        org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
          hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
      }
    
      def runAll(): Unit = {
        shuttingDown = true
        var nextHook: SparkShutdownHook = null
        while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
          Try(Utils.logUncaughtExceptions(nextHook.run()))
        }
      }
    
      def add(priority: Int, hook: () => Unit): AnyRef = {
        hooks.synchronized {
          if (shuttingDown) {
            throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
          }
          val hookRef = new SparkShutdownHook(priority, hook)
          hooks.add(hookRef)
          hookRef
        }
      }
    
      def remove(ref: AnyRef): Boolean = {
        hooks.synchronized { hooks.remove(ref) }
      }
    
    }
    
    private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
      extends Comparable[SparkShutdownHook] {
    
      override def compareTo(other: SparkShutdownHook): Int = {
        other.priority - priority
      }
    
      def run(): Unit = hook()
    
    }
  • 相关阅读:
    js数据类型的转换
    个人总结css中的四种position定位
    c3和js写的切割轮播图 喜欢宋仲基的妹子汉子们来,观看效果需要引入jQuery^^
    立体骰子(css3和js)
    友善串口调试助手
    Pixhawk之调试方法
    深度学习论文笔记:Deep Residual Networks with Dynamically Weighted Wavelet Coefficients for Fault Diagnosis of Planetary Gearboxes
    Sublime Text 插件之常用20个插件
    lll
    浅谈jQuery的对象构成1
  • 原文地址:https://www.cnblogs.com/itboys/p/10311512.html
Copyright © 2011-2022 走看看