zoukankan      html  css  js  c++  java
  • spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable

    函数代码:

    class MySparkJob{
        def entry(spark:SparkSession):Unit={
              def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = {
              val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss
    
              innerRsrp
            }
            spark.udf.register("getXXX", getXXX _)
            
            import spark.sql
            sql(s"""|select getInnerRsrp(t10.outer_rsrp,t10.wear_loss,t10.path_loss) as rsrp, xx from yy""".stripMargin) 
        }  
    }

    使用spark-submit提交函数时,抛出异常:

    User class threw exception: org.apache.spark.SparkException: Task not serializable 
    
    org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2125)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:842)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:841)
        at com.dx.fpd_withscenetype.MySparkJob.entry(MySparkJob.scala:207)
        at com.dx.App$.main(App.scala:58)
        at com.dx.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:650)
    Caused by: java.io.NotSerializableException: com.dx.fpd_withscenetype.MySparkJob
    Serialization stack:
        - object not serializable (class: com.dx.fpd_withscenetype.MySparkJob, value: com.dx.fpd_withscenetype.MySparkJob@e4d4393)
        - field (class: com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$1, name: $outer, type: class com.dx.fpd_withscenetype.MySparkJob)
        - object (class com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$1, <function2>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, name: func$3, type: interface scala.Function2)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:getInnerRsrp(cast(input[1, double, true] as int), cast(input[12, double, true] as int), cast(input[13, double, true] as int)))
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 3)
        - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
        - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        ... 147 more

    解决方案:

    把当前MySparkJob集成Serializable

    class MySparkJob extends Serializable {
        xxx
    }
  • 相关阅读:
    生成微博授权URL及回调地址
    微博三方登录
    celery异步发送短信
    celery配置
    celery原理与组件
    Django----短信验证接口
    Django----图片验证码接口
    编写注册接口
    jwt安装配置
    day19-Exception
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8570796.html
Copyright © 2011-2022 走看看