RDD转换算子
map(function)
传入的集合元素进行RDD[T]转换 def map(f: T => U): org.apache.spark.rdd.RDD[U]
scala> sc.parallelize(List(1,2,3,4,5),3).map(item => item*2+" " ) res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:25 scala> sc.parallelize(List(1,2,3,4,5),3).map(item => item*2+" " ).collect res2: Array[String] = Array("2 ", "4 ", "6 ", "8 ", "10 ")
filter(func)
将满足条件结果记录def filter(f: T=> Boolean): org.apache.spark.rdd.RDD[T]
scala> sc.parallelize(List(1,2,3,4,5),3).filter(item=>item==3).collect res3:Array[Int]=Array(3)
flatMap(func)
将一个元素转换成元素的数组,然后对数组展开。def flatMap[U](f: T=> TraversableOnce[U]): org.apache.spark.rdd.RDD[U]
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).collect res4: Array[String] = Array(ni, hao, hello, spark)
mapPartitions(func)
于map类似,但在RDD的每个分区上单独运行,因此当在类型T的RDD上运行时,func必须是Iterator => Iterator 类型
def mapPartitions[U](f: Iterator[Int] => Iterator[U],preservesPartitioning: Boolean): org.apache.spark.rdd.RDD[U]
scala> sc.parallelize(List(1,2,3,4,5),3).mapPartitions(items=> for(i<-items;if(i%2==0)) yield i*2 ).collect() res7: Array[Int] = Array(4, 8)
mapPartitionsWithIndex(func)
于mapPartitions类似,但也为func提供了表示索引的整数值,因此当在类型T的RDD上运行时,func必须时类型(Int,Iterator <T>)=> Iterator <U>
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean): org.apache.spark.rdd.RDD[U]
scala> sc.parallelize(List(1,2,3,4,5),3).mapPartitionsWithIndex((p,items)=> for(i<-items) yield (p,i)).collect res11: Array[(Int, Int)] = Array((0,1), (1,2), (1,3), (2,4), (2,5))
sample(withReplacement, fraction, seed)
对数据进行一定比例的采样,使用withReplacement参数控制是否允许重复采样
scala> sc.parallelize(List(1,2,3,4,5,6,7),3).sample(false,0.7,1L).collect res13: Array[Int] = Array(1, 4, 6, 7)
union(otherDataset)
返回一个新数据集,其中包含源数据集和参数中元素的并集
scala> var rdd1=sc.parallelize(Array(("张三",1000),("李四",100),("赵六",300))) scala> var rdd2=sc.parallelize(Array(("张三",1000),("王五",100),("温七",300))) scala> rdd1.union(rdd2).collect res16: Array[(String, Int)] = Array((张三,1000), (李四,100), (赵六,300), (张三,1000), (王五,100), (温七,300))
intersection(otherDataset)
返回包含源数据集和参数中元素交集的新RDD
scala> var rdd1=sc.parallelize(Array(("张三",1000),("李四",100),("赵六",300))) scala> var rdd2=sc.parallelize(Array(("张三",1000),("王五",100),("温七",300))) scala> rdd1.intersection(rdd2).collect res17: Array[(String, Int)] = Array((张三,1000))
distinct
去重
scala> sc.parallelize(List(1,2,3,3,5,7,2),3).distinct.collect res19: Array[Int] = Array(3, 1, 7, 5, 2
groupByKey
在(K,V)对的数据集上调用时,返回(K,Iterable )对的数据集。 注意:如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKey或aggregateByKey将产生更好的性能。 注意:默认情况下,输出中的并行级别取决于父RDD的分区数。您可以传递可选的numPartitions参数来设置不同数量的任务。
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).groupByKey(3).map(tuple=>(tuple._1,tuple._2.sum)).collect
reduceByKey(func,[numpartitions])
当调用(k,v)对的数据集时,返回(k,v)对的数据集,其中使用给定的reduce函数func集合每个键的值,该函数必须时类型(v,v)=>v
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).reduceByKey((v1,v2)=>v1+v2).collect() res33: Array[(String, Int)] = Array((hao,1), (hello,1), (spark,1), (ni,1)) scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).reduceByKey(_+_).collect() res34: Array[(String, Int)] = Array((hao,1), (hello,1), (spark,1), (ni,1))
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
当调用(k,v)对的数据集时,返回(k,u)对的数据集,其中使用给定的组合函数和中性“零”值聚合每个键的值。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).collect res35: Array[(String, Long)] = Array((hao,1), (hello,1), (spark,1), (ni,1))
sortByKey([ascending], [numPartitions])
当调用K实现Ordered的(K,V)对数据集时,返回按键升序或降序排序的(K,V)对数据集,如布尔升序参数中所指定。
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).sortByKey(false).collect() res37: Array[(String, Long)] = Array((spark,1), (ni,1), (hello,1), (hao,1))
sortBy(func,[ascending], [numPartitions])
对(K,V)数据集调用sortBy时,用户可以通过指定func指定排序规则,T => U 要求U必须实现Ordered接口
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).sortBy(_._2,true,2).collect res42: Array[(String, Long)] = Array((hao,1), (hello,1), (spark,1), (ni,1))
join
当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集以及每个键的所有元素对。通过leftOuterJoin,reghtOuterJoin和fullOuterJoin支持外连接。
scala> var rdd1=sc.parallelize(Array(("001","张三"),("002","李四"),("003","王五"))) scala> var rdd2=sc.parallelize(Array(("001",("apple",18.0)),("001",("orange",18.0)))) scala> rdd1.join(rdd2).collect res43: Array[(String, (String, (String, Double)))] = Array((001,(张三,(apple,18.0))), (001,(张三,(orange,18.0))))
cogroup
当调用类型(K,V)和(K,W)的数据集时,返回(K,(Iterable,Iterable))元组的数据集。此操作也称为groupWith.
scala> var rdd1=sc.parallelize(Array(("001","张三"),("002","李四"),("003","王五"))) scala> var rdd2=sc.parallelize(Array(("001","apple"),("001","orange"),("002","book"))) scala> rdd1.cogroup(rdd2).collect() res46: Array[(String, (Iterable[String], Iterable[String]))] = Array((001,(CompactBuffer(张三),CompactBuffer(apple, orange))), (002,(CompactBuffer(李四),CompactBuffer(book))), (003,(CompactBuffer(王五),CompactBuffer())))
cartesian
当调用类型为T和U的数据集时,返回(T,U)对的数据集
scala> var rdd1=sc.parallelize(List("a","b","c")) scala> var rdd2=sc.parallelize(List(1,2,3,4)) scala> rdd1.cartesian(rdd2).collect() res47: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (a,4), (b,1), (b,2), (b,3), (b,4), (c,1), (c,2), (c,3), (c,4))
coalesce(numPartitions)
将RDD中的分区数减少为numPartitions。过滤大型数据集后,可以使用概算子减少分区数
scala> sc.parallelize(List("ni hao","hello spark"),3).coalesce(1).partitions.length res50: Int = 1 scala> sc.parallelize(List("ni hao","hello spark"),3).coalesce(1).getNumPartitions res51: Int = 1
repartition
随机重新调整RDD中的数据以创建更多或更少的分区。
scala> sc.parallelize(List("a","b","c"),3).mapPartitionsWithIndex((index,values)=>for(i<-values) yield (index,i) ).collect res52: Array[(Int, String)] = Array((0,a), (1,b), (2,c)) scala> sc.parallelize(List("a","b","c"),3).repartition(2).mapPartitionsWithIndex((index,values)=>for(i<-values) yield (index,i) ).collect res53: Array[(Int, String)] = Array((0,a), (0,c), (1,b))
执行算子
collect
用在测试环境下,通常使用collect算子将远程计算的结果拿到Driver端,用于测试
scala> var rdd1=sc.parallelize(List(1,2,3,4,5),3).collect().foreach(println)
saveAsTextFile
将计算结果存储在文件系统中,一般存储在HDFS上
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(_.split("\s+")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false,3).saveAsTextFile("hdfs:///wordcounts")
foreach
迭代遍历所有的RDD中的元素,通常是将foreach传递的数据写到外围系统中,比如说可以将数据写入到Hbase中。
scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(.split("s+")).map((,1)).reduceByKey(+).sortBy(.2,false,3).foreach(println) (hao,1) (hello,1) (spark,1) (ni,1)
注意如果使用以上代码写数据到外围系统,会因为不断创建和关闭连接影响写入效率,一般推荐使用foreachPartition
val lineRDD: RDD[String] = sc.textFile("file:///E:/demo/words/t_word.txt") lineRDD.flatMap(line=>line.split(" ")) .map(word=>(word,1)) .groupByKey() .map(tuple=>(tuple._1,tuple._2.sum)) .sortBy(tuple=>tuple._2,false,3) .foreachPartition(items=>{ //创建连接 items.foreach(t=>println("存储到数据库"+t)) //关闭连接 })
共享变量
变量广播
通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
val conf = new SparkConf().setAppName("demo").setMaster("local[2]") val sc = new SparkContext(conf) val userList = List( "001,张三,28,0", "002,李四,18,1", "003,王五,38,0", "004,zhaoliu,38,-1" ) val genderMap = Map("0" -> "女", "1" -> "男") val bcMap = sc.broadcast(genderMap) sc.parallelize(userList,3) .map(info=>{ val prefix = info.substring(0, info.lastIndexOf(",")) val gender = info.substring(info.lastIndexOf(",") + 1) val genderMapValue = bcMap.value val newGender = genderMapValue.getOrElse(gender, "未知") prefix + "," + newGender }).collect().foreach(println) sc.stop()
累加变量
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
scala> var count=sc.longAccumulator("count") scala> sc.parallelize(List(1,2,3,4,5,6),3).foreach(item=> count.add(item)) scala> count.value res1: Long = 21