zoukankan      html  css  js  c++  java
  • spark提示Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;

    spark提示Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;

    起因

    编写了一个处理两列是否相等的UDF,这两列的数据结构是一样的,但是结构比较复杂,如下:

    |-- list: array (nullable = true)
     |    |-- element: map (containsNull = true)
     |    |    |-- key: string
     |    |    |-- value: array (valueContainsNull = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- Date: integer (nullable = true)
     |    |    |    |    |-- Name: string (nullable = true)
     |-- list2: array (nullable = true)
     |    |-- element: map (containsNull = true)
     |    |    |-- key: string
     |    |    |-- value: array (valueContainsNull = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- Date: integer (nullable = true)
     |    |    |    |    |-- Name: string (nullable = true)
    

    也就是说Array里嵌着Map,Map里还嵌着一个Array,只能依次去比较,编写的UDF如下:

    case class AppList(Date: Int, versionCode: Int, Name:String)
    
        def isMapEqual(map1: Map[String, Array[AppList]], map2:Map[String, Array[AppList]]): Boolean = {
          try{
            if (map1.size != map2.size){
              return false
            } else{
              for ( x <- map1.keys){
                if (map1(x) != map2(x)){
                  return false
                }
              }
              return true
            }
          } catch {
            case e: Exception => false
          }
        }
    
        def isListEqual(list1: Array[Map[String, Array[AppList]]], list2:Seq[Map[String, Seq[AppList]]]): Boolean = {
          try {
            if (list1.length != list2.length){
               return false
            } else if (list1.length == 0 || list2.length == 0){
              return false
            } else {
              return isMapEqual(list1(0), list2(0))
            }
          } catch {
            case e: Exception => false
          }
        }
    
        val isColumnEqual = udf((list1: Array[Map[String, Array[AppList]]], list2:Array[Map[String, Array[AppList]]]) => {
          isListEqual(list1, list2)
        })
    

    然后我就贴到spark-shell里去执行了下面语句:

    val dat = df.withColumn("equal", isColumnEqual($"list1", $"list2"))
    dat.show()
    

    此时就出现了如下错误:

    Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<map<string,array<struct<Date:int,Name:string>>>>, array<map<string,array<struct<Date:int,Name:string>>>>) => boolean)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;
      at $anonfun$1.apply(<console>:42)
      ... 16 more
    
    解决办法

    所谓的解决办法,自然是去谷歌了…

    这里看到,说把Array改成Seq就好了,囧,尝试了一下,果然就好了

    原因

    这里说:

    So it looks like the ArrayType on Dataframe "idDF" is really a WrappedArray and not an Array - So the function call to "filterMapKeysWithSet" failed as it expected an Array but got a WrappedArray/ Seq instead (which doesn't implicitly convert to Array in Scala 2.8 and above).

    意思是,此Array非Scala中的原生Array,而是封装了一下的Array(有错的一定指出来,我都没写过Scala,慌

    参考
  • 相关阅读:
    Java面向对象练习输出水仙花
    Java面向对象练习学生信息输出
    java面线对象练习时钟
    java面向对象存取款
    0516Java面向对象求面积练习
    有一分数序列:2/1,3/2,5/3,8/5,13/8,21/13...求出这个数列的前20项之和。
    0516编写西游记人物类
    0514练习
    仓鼠找sugar
    NOIP2018旅行
  • 原文地址:https://www.cnblogs.com/wswang/p/7909077.html
Copyright © 2011-2022 走看看