package cn.itcast.spark.czh
import org.apache.spark.{SparkConf, SparkContext}
object TestFun {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
// val rdd = sc.parallelize(1 to 10) //创建RDD
// val map = rdd.map(_*2) //对RDD中的每个元素都乘于2
// map.foreach(x => print(x+" "))
// sc.stop()
// fun1()
// val l = List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female"))
// val rdd = sc.parallelize(l, 2)
// /*写法一*/
// // val mp = rdd.mapPartitions(fun2)
// /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
// // mp.collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出
// /*写法二*/
// // rdd.mapPartitions(x=>x.filter(_._2=="female")).map(_._1).foreach(x=>print(x+" "))
// /*写法三*/
// val mp = rdd.mapPartitionsWithIndex(fun3)
// mp.collect().foreach(x => (print(x + " ")))
// fun4(sc)
fun9(sc)
}
/*map 数据集中的每个元素经过用户自定义的函数转换成一个新的RDD,新的RDD交MappedRDD*/
def fun0(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10) //创建RDD
val map = rdd.map(_ * 2) //对RDD中的每个元素都乘于2
map.foreach(x => print(x + " "))
sc.stop()
}
/*flatMap 与map类似,但每个元素输入项都可以被映射到0或者多个输出项,最终将结果扁平化输出*/
def fun1(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("flatMap")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10)
val array = rdd.flatMap(x => (1 to x)).collect()
array.foreach(a => print(a))
}
/*mapPartitions 类似于map,map作用于每个分区的每个元素,单mapPartitions作用于每个分区工*/
def fun2(iterable: Iterator[(String, String)]): Iterator[String] = {
var woman = List[String]()
while (iterable.hasNext) {
val next = iterable.next()
next match {
case (_, "female") => woman = next._1 :: woman
case _ =>
}
}
return woman.iterator
}
/*mapPartitionsWithIndex*/
def fun3(index: Int, iterator: Iterator[(String, String)]): Iterator[String] = {
var woman = List[String]()
while (iterator.hasNext) {
val next = iterator.next()
next match {
case (_, "female") => woman = "[" + index + "]" + next._1 :: woman
case _ =>
}
}
return woman.iterator
}
/* sample 对RDD进行抽样
* 参数解释:withReplacement 为true表示抽样之后还放回RDD,可以被多次抽样,false表示不放回。
* fraction 表示抽样比例
* seed为随机数种子,比如时间戳
* */
def fun4(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
val sample1 = rdd.sample(true, 0.5)
sample1.collect().foreach(x => print(x + " "))
sc.stop()
}
/*union 将两个RDD中的数据进行合并,并最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重*/
def fun5(sc: SparkContext): Unit = {
var rdd1 = sc.parallelize(1 to 10)
var rdd2 = sc.parallelize(6 to 15)
rdd1.union(rdd2).collect().foreach(x => print(x + " "))
}
/*intersection 返回两个RDD的交集*/
def fun6(sc: SparkContext): Unit = {
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(5 to 20)
rdd1.intersection(rdd2).collect().foreach(x => print(x + " "))
}
/*distinct 对RDD中的元素进行去重*/
def fun7(sc: SparkContext): Unit = {
val rdd1 = sc.parallelize(List(1, 1, 2, 3, 4, 5, 5))
rdd1.distinct().collect().foreach(x => print(x + " "))
}
/*cartesian 对两个RDD中所有的元素进行笛卡尔积操作*/
def fun8(sc : SparkContext): Unit ={
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(1 to 4)
rdd1.cartesian(rdd2).collect().foreach(x=>print(x+" "))
}
/*coalesce(numPartitions,shuffle) 对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数目,但不会报错,只是分区个数还是原来的*/
def fun9(sc :SparkContext): Unit ={
/*shuffle 为 false*/
val rdd1 = sc.parallelize(1 to 16,4)
val rdd2 = rdd1.coalesce(3)
print(rdd2.partitions.size)
}
}