import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 简单算子演示 */ object FunctionDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("CreateRDD").setMaster("local") val sc = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,3,7,8,9)) //1.map:对RDD中每一个元素进行遍历并加以计算,返回一个全新RDD val rdd2: RDD[Int] = rdd.map(_ * 2) println(rdd2.collect().toBuffer) //2.filter:对RDD中每一个元素执行Boolean类型表达式,结果为ture 值反值存储到新的RDD中 val rdd3: RDD[Int] = rdd2.filter(_ > 10) println(rdd3.collect().toBuffer) //3.flatMap:对RDD中存在集合进行压平处理,将集合内部的数据取出存储到一个全新的RDD中 val rdd4 = sc.parallelize(Array("a b c","b c d")) val rdd5: RDD[String] = rdd4.flatMap(_.split(" ")) println(rdd5.collect().toBuffer) //4.sample:随机抽样 //抽样只能在一个范围内返回 ,但是范围会有一定的波动 //参数说明 /* withReplacement: Boolean, 表示抽取出数据是否返回原有样例中 true这个值会被放回抽样中 false 不会放回 fraction: Double, 抽样比例 即 抽取30% 写入值就是 0.3(本身Double就是一个不精确数据) seed: Long = Utils.random.nextLong 种子, 随机获取数据的方式 ,默认不传 */ val rdd5_1 = sc.parallelize(1 to 10) val sample: RDD[Int] = rdd5_1.sample(false,0.3) println(sample.collect().toBuffer) //5.union:并集 val rdd6 = sc.parallelize(List(5,6,7,8)) val rdd7 = sc.parallelize(List(1,2,5,6)) val rdd8 = rdd6 union rdd7 println( rdd8.collect.toBuffer) //6.intersection:求交集 val rdd9: RDD[Int] = rdd6 intersection rdd7 println( rdd9.collect.toBuffer) //7.distinct:去重复 println(rdd8.distinct.collect.toBuffer) //8.join:相同key才会被合并,没有相同的key将被舍弃掉 val rdd10_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2))) val rdd10_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10))) val rdd10: RDD[(String, (Int, Int))] = rdd10_1 join rdd10_2 println(rdd10.collect().toBuffer) //9.LeftOuterJoin/rightOuterJoin:左连接/右连接 //无论是左连接还是右连接,除了基本值外 ,剩余值的数据类型是Option类型 val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 println(rdd10_4.collect.toList) println(rdd10_5.collect.toList) //10.cartesian:笛卡尔积 val rdd11_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2))) val rdd11_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10))) val rdd11_3 = rdd11_1 cartesian rdd11_2 println(rdd11_3.collect.toList) //11.分组 val rdd11_4= sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("tom",2))) //11.1 根据传入的参数进行分组 val rdd11_5: RDD[(String, Iterable[(String, Int)])] = rdd11_4.groupBy(_._1) //11.2 根据key进行分区(对KV形式是使用) -->除了指定分组之后分区的数量之外, 还可以使用自定义分区器 val rdd11_6: RDD[(String, Iterable[Int])] = rdd11_4.groupByKey() //11.3 cogroup根据key进行分组(分组必须是一个对偶元组) val rdd11_7: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2 println(rdd11_7.collect.toBuffer) /* ps:当前方法和groupByKey都可以对数据进行分组,但是,groupByKey会将相同key的值(value)存储在一起(一个集合) cogroup 参数是另外一个要合并分组的RDD(必须是对偶元组),根据相同key进行额分组,但是value不会存在一个集合中 */ } }