import org.apache.spark.rdd.RDD /* action算子(行动算子) --> 是触发转换算子计算 一个action算子触发,就会产生job action算子的返回值基本上就不是RDD,所以在action算子后面在触发计算,就需要区分计算的数据了 */ object ActionFunctionDemo{ def main(args: Array[String]): Unit = { val sc = CreateSparkContext.createSparkContxt("ActionFunctionDemo","local[*]") //1.计算求和 reduce(多用于没有key的情况下进行求和) val rdd1 = sc.parallelize(List(2,1,3,6,5),2) val rdd1_1:Int = rdd1.reduce(_+_) println(rdd1_1) //2.将RDD中数据转换Scala中数组存储[不可变数组] println(rdd1.collect.toBuffer) //3.返回RDD的元素个数 println(rdd1.count()) //4. top 取出对应数量值,默认降序,若输入0,返回一个空数组[数组是一个不可变数据] println(rdd1.top(3).toBuffer) //5.take 顺序取出对应数量值 返回也是一个数组[是一个不可变数组] println( rdd1.take(3).toBuffer) //6.takeOrdered 顺序取出对应数量的值, 默认是升序 返回的是一个数组[并不可变数组] println(rdd1.takeOrdered(3).toBuffer) //7.first 取出第一个值 等价于 take(1) 取值等价,不等价返回值 println(rdd1.first()) //8.对输出数据进行处理,将数据写成文件 //rdd1.saveAsTextFile("文件存储路径[本地/hdfs]") //9 countBykey --> 统计key的个数并生成一个Map k是key的名字 v 是key的个数 val rdd2 = sc.parallelize(List(("key1",2),("key2",2))) val col: collection.Map[String, Long] = rdd2.countByKey() //10.遍历算子 跟scala中没有区别 rdd1.foreach(x => println(x)) /*其他算子*/ // 统计value的个数(但是会将集合中的一个元素看做是一个value并统计其个数)--> Action算子 val tupleToLong: collection.Map[(String, Int), Long] = rdd2.countByValue() //filterByRange 对RDD中元素进行过滤,但是指定范围内的数据(包括开始位置和结束位置) -> 转换算子 val rdd3 = sc.parallelize(List(("e",3),("c",5),("d",4),("c",2),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e") //flatMapValue对应是kv对中的value值进行扁平化处理 --> 转换算子 val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4"))) //这个算子对应的是value值 val rdd3_3: RDD[(String, String)] = rdd3_2.flatMapValues(_.split(" ")) //必须记住 foreachPartition --> Action // 循环处理分区内数据, 一般是用于数据持久化,即存入数据库中 val rdd4 = sc.parallelize(List(1,2,3,4,5,6),2) rdd4.foreachPartition(x => println(x.reduce(_+_))) //keyBy 以传入的函数作为key,RDD中元组作为value ,返回一个新的元组 --> 转换算子 val rdd5 = sc.parallelize(List("dog","cat"),3) val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) //必须记住 collectAsMap --> 将RDD中对偶元组(二元组) 转换为一个Map --> Action算子 val stringToInt: collection.Map[String, Int] = rdd2.collectAsMap() //转换算子 --> keys所有key值 values所有value值 --> RDD中对偶元组 } }