方案一:使用functions里面的monotonically_increasing_id(),生成单调递增,不保证连续,最大64bit,的一列.分区数不变。
import org.apache.spark.sql.functions._ val df1 = spark.range(0,1000).toDF("col1") val df2 = df1.withColumn("id", monotonically_increasing_id())
注意:有多个分区的时候,每个分区里面是单调递增,step为1,分区之间不保证连续,如一共两个分区,0分区id是0-499,1分区id可能99000-99499,甚至更大,最大64bit的integer。
如果想要整体连续,可以先repartition(1),操作完后在repartition(n)
方案二:使用row_number().over(Windo.orderBy(ColName)),生成按某列排序后,新增单调递增,连续的一列。操作完后分区数变为1,id列从1开始。
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.row_number val df1 = spark.range(0,1000).toDF("col1") println(df.rdd.getNumPartitions) val w = Window.orderBy("col1") val df2 = df.withColumn("id", row_number().over(w)) println(df2.rdd.getNumPartitions
方案三:将DataFrame转成RDD,使用RDD的方法zipWithIndex()/zipWithUniqueId(),分区数不变。
val df1: DataFrame = spark.range(0,1000000).toDF("col1") //转成rdd并使用zipWithIndex() var tempRDD: RDD[(Row, Long)] = df1.rdd.zipWithIndex() //使用map val record: RDD[Row] = tempRDD.map(x => { Row(x._1.get(0), x._2) }) val schema= new StructType().add("col1","long") .add("id","long") spark.createDataFrame(record,schema).show()
zipWithIndex():首先基于分区索引排序,然后是每个分区中的项的排序。所以第一个分区中的第一项得到索引0,最后一个分区中的最后一项得到最大的索引。从0开始
zipWithUniqueId(): 每个分区是一个等差数列,等差为分区数n,每个分区的第一个值为分区id(id从0开始)。第k个分区:num*n+k,num在每个分区都是从0开始,step为1
3个分区,abc 0分区 def 1分区 ghi 2分区 col1 id a 0*3+0=0 b 1*3+0=3 c 2*3+0=6 d 0*3+1=1 e 1*3+1=4 f 2*3+1=7 g 0*3+2=2 h 1*3+2=5 i 2*3+2=8
原文链接:https://blog.csdn.net/liaodaoluyun/article/details/86232639