若DataFrame为简单的二维表,则可以借助RDD的zipWithIndex实现索引列添加。
scala> val df = spark.createDataFrame(Seq(("ming",20,1234),("hong",19,1235),("zhi",21,1236))) scala> df.show +----+---+----+ | _1| _2| _3| +----+---+----+ |ming| 20|1234| |hong| 19|1235| | zhi| 21|1236| +----+---+----+ scala> val rdd = df.rdd.zipWithIndex() rdd: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = ZippedWithIndexRDD[5] at zipWithIndex at <console>:25 scala> rdd.collect.foreach(println) ([ming,20,1234],0) ([hong,19,1235],1) ([zhi,21,1236],2)
若DataFrame来源于JSON格式数据,直接通过rdd.zipWithIndex实现索引列添加,会报如下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_1") - root class: "scala.Tuple2" at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:650) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:644) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) at com.ipinyou.mip.ma.worker.ABTestProcessor$.getAudienceByNumber(ABTestProcessor.scala:69) at com.ipinyou.mip.ma.worker.ABTestProcessor$.execute(ABTestProcessor.scala:47) at com.ipinyou.mip.ma.worker.ABTestProcessor$.main(ABTestProcessor.scala:92) at com.ipinyou.mip.ma.worker.ABTestProcessor.main(ABTestProcessor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
解决办法是,选择DataFrame中数据维度单一的列,转成rdd后使用zipWithIndex,最后将此DataFrame与原始DataFrame做join,这个操作的局限是所选择的列必须不能存在重复值。
audience数据示例:
val indexColName = "index" val cdpId = audience.select(cdpIdColName).rdd.map(_.getString(0)).zipWithIndex().toDF(cdpIdColName, indexColName) val audienceWithIndex = audience.join(cdpId, Seq(cdpIdColName), "left")