zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(10)提交spark任务偶尔报错 org.apache.spark.SparkException: A master URL must be set in your configuration

    spark 2.1.1

    一 问题重现

    问题代码示例

    object MethodPositionTest {
    
     
    
      val sparkConf = new SparkConf().setAppName("MethodPositionTest")
    
      val sc = new SparkContext(sparkConf)
    
      val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    
     
    
      def main(args : Array[String]) : Unit = {
    
        val cnt = spark.sql("select * from test_table").rdd.map(item => mapFun(item.getString(0))).count
    
        println(cnt)
    
      }
    
      def mapFun(str : String) : String = "p:" + str
    
    }

    当如下3行代码放到main外时

        val sparkConf = new SparkConf().setAppName(getName)

        val sc = new SparkContext(sparkConf)

        val spark = SparkSession.builder().enableHiveSupport().getOrCreate()

    有一定几率报错:

    Caused by: java.lang.ExceptionInInitializerError

        at app.package.AppClass$$anonfun$1.apply(AppClass.scala:208)

        at org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)

        at org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$class.foreach(Iterator.scala:893)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)

        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)

        at scala.collection.AbstractIterator.to(Iterator.scala:1336)

        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)

        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)

        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)

        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)

        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)

        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)

        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)

        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

        at org.apache.spark.scheduler.Task.run(Task.scala:99)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

    Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration

        at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)

        at app.package.AppClass$.<clinit>(AppClass.scala)

    二 问题解析

    MethodPositionTest 定义了一个匿名函数anonfun,这个匿名函数在RDD.map中调用,即在Executor中执行,匿名函数中又依赖mapFun方法,触发类的初始化:MethodPositionTest$.<clinit>,初始化时会执行main外的spark初始化代码,即在Executor中创建SparkConf和SparkContext,这是不应该发生的,一个spark应用中只能有一个SparkContext并且应该在Driver端而不是Executor,而且发生之后会导致错误,代码如下:

    org.apache.spark.SparkContext

      try {
    
        _conf = config.clone()
    
        _conf.validateSettings()
    
     
    
        if (!_conf.contains("spark.master")) {
    
          throw new SparkException("A master URL must be set in your configuration")
    
     
    
        }

    问题1)为什么在Driver端不会报错找不到master,而在Executor端会报错

    Spark应用代码如下:

    val sparkConf = new SparkConf().setAppName(getName)

    这里SparkConf只设置了AppName,为什么在Driver端不会报错找不到master,而在Executor端会报错,这里需要看Spark Submit的执行过程,详见 https://www.cnblogs.com/barneywill/p/9820684.html

    Driver端执行时SparkSubmit会将各种参数包括命令行、配置文件、系统环境变量等,统一设置到系统环境变量

        for ((key, value) <- sysProps) {

          System.setProperty(key, value)

        }

    然后SparkConf会默认从系统环境变量中加载配置

        for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {

          set(key, value, silent)

        }

    所以Driver端的SparkConf会包含所有的参数,但是Executor端则没有。

    问题2)当spark相关的初始化代码在main外时,为什么有时报错,有时不报错

    具体情形如下:
    1)如果main里边的transformation(示例中的map方法)不依赖外部函数调用,正常;
    2)如果main里边的transformation(示例中的map方法)依赖main里的函数,报错;
    3)如果main里边的transformation(示例中的map方法)依赖main外的函数,报错;

    这里可以通过反编译代码来看原因,示例MethodPositionTest的反编译代码如下:

    public final class MethodPositionTest$
    
    {
    
     
    
             public static final MethodPositionTest$ MODULE$ = this;
    
             private final SparkConf sparkConf = (new SparkConf()).setAppName("MethodPositionTest");
    
             private final SparkContext sc = new SparkContext(sparkConf());
    
             private final SparkSession spark;
    
     
    
             public SparkConf sparkConf()
    
             {
    
                      return sparkConf;
    
             }
    
     
    
             public SparkContext sc()
    
             {
    
                      return sc;
    
             }
    
     
    
             public SparkSession spark()
    
             {
    
                      return spark;
    
             }
    
     
    
             public String mapFun(String str)
    
             {
    
                      return (new StringBuilder()).append("p:").append(str).toString();
    
             }
    
     
    
             public void main(String args[])
    
             {
    
                      long cnt = spark().sql("select * from test_table").rdd().map(new Serializable() {
    
     
    
                              public static final long serialVersionUID = 0L;
    
     
    
                              public final String apply(Row item)
    
                              {
    
                                       return MethodPositionTest$.MODULE$.mapFun(item.getString(0));
    
                              }
    
     
    
                              public final volatile Object apply(Object v1)
    
                              {
    
                                       return apply((Row)v1);
    
                              }
    
     
    
                      }, ClassTag$.MODULE$.apply(java/lang/String)).count();
    
                      Predef$.MODULE$.println(BoxesRunTime.boxToLong(cnt));
    
             }
    
     
    
             private MethodPositionTest$()
    
             {
    
                      spark = SparkSession$.MODULE$.builder().enableHiveSupport().getOrCreate();
    
             }
    
     
    
             static
    
             {
    
                      new MethodPositionTest$();
    
             }
    
    }

    可见这里的匿名内部类依赖类MethodPositionTest$的方法mapFun,所以会触发类MethodPositionTest$的加载以及静态代码块执行,触发报错;

    综上,不建议将spark的初始化代码放到main外,很容易出问题。

  • 相关阅读:
    每天一道LeetCode--141.Linked List Cycle(链表环问题)
    每天一道LeetCode--119.Pascal's Triangle II(杨辉三角)
    每天一道LeetCode--118. Pascal's Triangle(杨辉三角)
    CF1277D Let's Play the Words?
    CF1281B Azamon Web Services
    CF1197D Yet Another Subarray Problem
    CF1237D Balanced Playlist
    CF1239A Ivan the Fool and the Probability Theory
    CF1223D Sequence Sorting
    CF1228D Complete Tripartite
  • 原文地址:https://www.cnblogs.com/barneywill/p/10109122.html
Copyright © 2011-2022 走看看