zoukankan      html  css  js  c++  java
  • Spark DataFrame 添加索引列

    若DataFrame为简单的二维表,则可以借助RDD的zipWithIndex实现索引列添加。

    scala> val df = spark.createDataFrame(Seq(("ming",20,1234),("hong",19,1235),("zhi",21,1236)))
    
    scala> df.show
    +----+---+----+
    |  _1| _2|  _3|
    +----+---+----+
    |ming| 20|1234|
    |hong| 19|1235|
    | zhi| 21|1236|
    +----+---+----+
    
    
    scala> val rdd = df.rdd.zipWithIndex()
    rdd: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = ZippedWithIndexRDD[5] at zipWithIndex at <console>:25
    
    scala> rdd.collect.foreach(println)
    ([ming,20,1234],0)
    ([hong,19,1235],1)
    ([zhi,21,1236],2)

    若DataFrame来源于JSON格式数据,直接通过rdd.zipWithIndex实现索引列添加,会报如下错误:

    Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
    - field (class: "org.apache.spark.sql.Row", name: "_1")
    - root class: "scala.Tuple2"
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:650)
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
            at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
            at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
            at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
            at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:644)
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
            at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
            at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
            at scala.collection.immutable.List.foreach(List.scala:392)
            at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
            at scala.collection.immutable.List.flatMap(List.scala:355)
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
            at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
            at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
            at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
            at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
            at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
            at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
            at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
            at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
            at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
            at com.ipinyou.mip.ma.worker.ABTestProcessor$.getAudienceByNumber(ABTestProcessor.scala:69)
            at com.ipinyou.mip.ma.worker.ABTestProcessor$.execute(ABTestProcessor.scala:47)
            at com.ipinyou.mip.ma.worker.ABTestProcessor$.main(ABTestProcessor.scala:92)
            at com.ipinyou.mip.ma.worker.ABTestProcessor.main(ABTestProcessor.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
            at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
            at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
            at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
            at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    解决办法是,选择DataFrame中数据维度单一的列,转成rdd后使用zipWithIndex,最后将此DataFrame与原始DataFrame做join,这个操作的局限是所选择的列必须不能存在重复值。

    audience数据示例:

    val indexColName = "index"
    val cdpId = audience.select(cdpIdColName).rdd.map(_.getString(0)).zipWithIndex().toDF(cdpIdColName, indexColName)
    val audienceWithIndex = audience.join(cdpId, Seq(cdpIdColName), "left")
  • 相关阅读:
    为什么单片机程序中会有延时程序加入
    20145238 《信息安全系统设计基础》课程总结
    20145238-荆玉茗 《信息安全系统设计》第14周学习总结
    20145238-荆玉茗 《信息安全系统设计基础》第13周学习总结
    20145238-荆玉茗 《信息安全系统设计基础》第十二周学习总结
    补:第五周实验楼实践
    20145224&20145238《信息安全系统设计基础》实验三
    20145238 《信息安全系统设计基础》第十一周学习总结
    20145238《信息安全系统设计基础》第十周学习总结
    20145224&20145238《信息安全系统设计基础》实验五
  • 原文地址:https://www.cnblogs.com/144823836yj/p/14120082.html
Copyright © 2011-2022 走看看