zoukankan      html  css  js  c++  java
  • SparkCore系列(二)rdd聚合操作,rdd之间聚合操作

    一:rdd聚合操作

    count

                val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
                val sc = new JavaSparkContext(conf).sc

                val dataLength = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).count()//相当于数组的length

                println(dataLength)

    countByValue

                val initialScores1: Array[(String, Double)] =
                Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)
                println(data1.countByValue) // 以当前值作为key计数

    reduce

                val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
                val sc = new JavaSparkContext(conf).sc

                val dataLength = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>x.toInt).reduce((x,y)=>x+y)//相当于数组的sum

                println(dataLength)

    reduceByKey

                val avg = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1))
                            .reduceByKey((x,y)=>x+y).collect().map(x=>println(x)) //reduceByKey现在map端进行聚合,在真正开发过程中也常用

    sortByKey

                val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
                val sc = new JavaSparkContext(conf).sc

                val data = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1)).sortByKey(true)//true正序,false倒序

                println(data.collect().map(x=>println(x)))

    countByKey生产一般不用

                val data = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1))
                            .countByKey() //map结构 key->key value->的个数

                println(data)

    collectAsMap生产一般不用

                val data = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1))
                            .collectAsMap() //map结构   

                println(data)

    flod

                val data = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>x.toInt)
                            .fold(100)((x,y)=>x+y)//带初始值的聚合

                println(data)

    groupByKey

                val avg = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1))
                            .groupByKey().collect().map(x=>println(x))//value 是一个数组,需要循环value时候使用

    aggregate

                //自定义聚合函数
                //第一个参数 两个函数都会以2为参数算一遍
                //第二个参数 文件内部行与行之间操作
                //第三个参数 文件结果 操作
                val sum = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>x.toInt)
                            .aggregate(2)(pfun1,pfun2)
                            println(sum)

                def pfun1(p1: Int, p2: Int): Int = {//行与行之间操作
                            println("p1"+p1+" p2:"+p2)
                            p1 * p2
                }
                def pfun2(p3: Int, p4: Int): Int = {//文件之间结果操作
                            p3 + p4
                }                           //sum

                def pfun1(p1:Tuple2[Int,Int], p2: Int): Tuple2[Int,Int] = {//行与行之间操作
                            (p1._1 + 1,p1._2 + p2)
                }
                def pfun2(p1:Tuple2[Int,Int], p2: Tuple2[Int,Int]): Tuple2[Int,Int] = {//文件之间结果操作
                            (p1._1 + p2._1,p1._2 + p2._2)
                }
                val avg = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>x.toInt)
                            .aggregate(0,0)(pfun1,pfun2)
                println(avg._2/avg._1)                           //avg

    combineByKey

                type MVType = (Int, Int)
                val avg = sc.textFile("/software/java/idea/data")
                            .flatMap(x=>x.split("\|")).map(x=>(x.toInt,1))
                            .combineByKey(
                                        score => (score,1), //创建元素
                                        (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), //处理已经遇到的键
                                        (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) //处理已经未遇到的键
                            ).collect().map(x=>println(x))//value 是一个数组,需要循环value时候使用

                //aggregate功能很类似

    二:rdd聚合操作

    union

                val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.union(data2).collect().map(x=> println(x)) //SQL中UNION

    intersection

                val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.intersection(data2).collect().map(x=> println(x)) //SQL INNER JOIN

    join

                val initialScores1: Array[(String, Double)] =
                Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.join(data2).collect().map(x=>println(x))
                //SQL INNER JOIN

    subtract

                val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),(
                val data1 = sc.parallelize(initialScores1)
                
                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.subtract(data2).collect().map(x=> println(x)) //LEFT ANTI

    subtractByKey

                val initialScores1: Array[(String, Double)] =
                Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.subtractByKey(data2).collect().map(x=>println(x))
                //删掉rdd1中与rdd2的key相同的元素 相当于subtract

    rightOuterJoin

                val initialScores1: Array[(String, Double)] =
                            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.rightOuterJoin(data2).collect().map(x=>println(x))
                //右外连接

    leftOuterJoin

                val initialScores1: Array[(String, Double)] =
                Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.leftOuterJoin(data2).collect().map(x=>println(x))
                //左外连接

    cartesian

                val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D",
                val data1 = sc.parallelize(initialScores1)
                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)
                data1.cartesian(data2).collect().map(x=> println(x)) // key不相同的笛卡尔积

    cogroup

                val initialScores1: Array[(String, Double)] =
                Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
                val data1 = sc.parallelize(initialScores1)

                val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
                val data2 = sc.parallelize(initialScores2)

                data1.cogroup(data2).collect().map(x=>println(x))
                //key 相同的笛卡尔积

  • 相关阅读:
    注册审核
    静态表单验证
    多条件查询
    0623TP框架联系
    0618框架 增删改练习
    php框架 数据添加
    0616框架查询
    0614空操作方法 空控制器 跨控制器调用 命名空间
    php 0613框架基础
    php查询
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12046673.html
Copyright © 2011-2022 走看看