zoukankan      html  css  js  c++  java
  • [Spark Core] Spark 实现气温统计


    0. 说明

      聚合气温数据,聚合出 MAX 、 MIN 、 AVG


    1. Spark Shell 实现

      1.1 MAX

      分步实现

    # 加载文档
    val rdd1 = sc.textFile("file:///home/centos/temp3.dat")
    
    # 压扁
    val rdd2 = rdd1.map(line=>{var arr = line.split(" ");(arr(0).toInt , arr(1).toInt)})
    
    # 得到最大值
    val rdd3 = rdd2.reduceByKey((a,b)=>if(a > b)a else b)
    
    rdd3.collect
    
    # 年份升序
    val rdd4 = rdd3.sortByKey(true)

    # 气温降序
    val rdd4 = rdd3.sortBy(t=>t._2,false)
    rdd4.collect 

    # 遍历输出
    rdd4.
    foreach(println)

      一步完成

    sc.textFile("file:///home/centos/temp3.dat").map(line=>{var arr = line.split(" ");(arr(0).toInt , arr(1).toInt)}).reduceByKey((a,b)=>if(a > b)a else b).sortByKey(true).collect.foreach(println)

      1.2 MIN

      分步实现

    # 加载文档
    val rdd1 = sc.textFile("file:///home/centos/temp3.dat")
    
    # 压扁
    val rdd2 = rdd1.map(line=>{var arr = line.split(" ");(arr(0).toInt , arr(1).toInt)})
    
    # 得到最小值
    val rdd3 = rdd2.reduceByKey((a,b)=>if(a < b)a else b)
    
    rdd3.collect
    
    # 年份升序
    val rdd4 = rdd3.sortByKey(true)
    
    # 气温降序
    val rdd4 = rdd3.sortBy(t=>t._2,false)
    rdd4.collect # 遍历输出 rdd4.
    foreach(println)

      一步完成

    sc.textFile("file:///home/centos/temp3.dat").map(line=>{var arr = line.split(" ");(arr(0).toInt , arr(1).toInt)}).reduceByKey((a,b)=>if(a < b)a else b).sortByKey(true).collect.foreach(println)

     


    2. IDEA 实现

      2.1 Scala 实现一

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 气温数据聚合应用
      */
    object TempAggDemoScala {
      def main(args: Array[String]): Unit = {
        // 创建 spark 配置对象
        val conf = new SparkConf()
        conf.setAppName("tempAgg2")
        conf.setMaster("local")
    
        // 创建上下文
        val sc = new SparkContext(conf)
    
        // 1. 加载文件
        val rdd1 = sc.textFile("file:///e:/temp3.dat")
    
        // 2. 切割成对(1930,54)
        val rdd2 = rdd1.map(line => {
          var arr = line.split(" ")
          (arr(0).toInt, arr(1).toInt)
        })
    
        // 3. 按照年度分组(1930->{23,34,67} , 1931->{...})
        val rdd3 = rdd2.groupByKey()
    
        // 4. 对组内元素进行统计聚合
        val rdd4 = rdd3.mapValues(it => {
          val max = it.max
          val min = it.min
          val sum = it.sum
          val size = it.size
          (max, min, sum.toFloat / size)
    
        })
    
        // 5. 按照年度排序
        val rdd5 = rdd4.sortByKey(true)
    
        // 6. 输出
        rdd5.collect().foreach(println)
      }
    }

      2.2 Scala 实现二

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 气温数据聚合应用2
      */
    object TempAggDemo2Scala {
      def main(args: Array[String]): Unit = {
        // 创建 spark 配置对象
        val conf = new SparkConf()
        conf.setAppName("tempAgg2")
        conf.setMaster("local")
    
        // 创建上下文
        val sc = new SparkContext(conf)
    
        // 1. 加载文件
        var rdd1 = sc.textFile("file:///e:/temp3.dat")
    
        // 2. 切割成对(1930,(54,54,54,1))
        val rdd2 = rdd1.map(line => {
          var arr = line.split(" ")
          // (max , min , sum , count)
          val year = arr(0).toInt
          val temp = arr(1).toInt
          (year, (temp, temp, temp, 1))
        })
    
        // 3. 聚合
        val rdd3 = rdd2.reduceByKey((a, b) => {
          import scala.math._
          (max(a._1, b._1), min(a._2, b._2), a._3 + b._3, a._4 + b._4)
        })
    
        // 4. 交换
        val rdd4 = rdd3.mapValues(t => {
          (t._1, t._2, t._3.toFloat / t._4)
        }).sortByKey()
    
        rdd4.collect().foreach(println)
      }
    }

      2.3 Java 实现二

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    import scala.Tuple3;
    import scala.Tuple4;
    
    import java.util.List;
    
    
    /**
     * 气温数据聚合应用2
     */
    public class TempAggDemoJava2 {
        public static void main(String[] args) {
            // 创建 spark 配置对象
            SparkConf conf = new SparkConf();
            conf.setAppName("tempAgg2");
            conf.setMaster("local");
    
            // 创建上下文
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            // 1. 加载文件
            JavaRDD<String> rdd1 = sc.textFile("file:///e:/temp3.dat");
    
            // 2. 变换(1903,(32,23,23,1))
            JavaPairRDD<Integer, Tuple4<Integer, Integer, Integer, Integer>> rdd2 = rdd1.mapToPair(new PairFunction<String, Integer, Tuple4<Integer, Integer, Integer, Integer>>() {
                public Tuple2<Integer, Tuple4<Integer, Integer, Integer, Integer>> call(String s) throws Exception {
                    String[] arr = s.split(" ");
                    int year = Integer.parseInt(arr[0]);
                    int temp = Integer.parseInt(arr[1]);
    
                    Tuple4<Integer, Integer, Integer, Integer> v = new Tuple4<Integer, Integer, Integer, Integer>(temp, temp, temp, 1);
                    return new Tuple2<Integer, Tuple4<Integer, Integer, Integer, Integer>>(year, v);
                }
            });
    
            // 3. 聚合
            JavaPairRDD<Integer, Tuple4<Integer, Integer, Integer, Integer>> rdd3 = rdd2.reduceByKey(
                    new Function2<Tuple4<Integer, Integer, Integer, Integer>, Tuple4<Integer, Integer, Integer, Integer>, Tuple4<Integer, Integer, Integer, Integer>>() {
                        public Tuple4<Integer, Integer, Integer, Integer> call(Tuple4<Integer, Integer, Integer, Integer> v1, Tuple4<Integer, Integer, Integer, Integer> v2) throws Exception {
                            int max = Math.max(v1._1(), v2._1());
                            int min = Math.min(v1._2(), v2._2());
                            int sum = v1._3() + v2._3();
                            int count = v1._4() + v2._4();
    
                            return new Tuple4<Integer, Integer, Integer, Integer>(max, min, sum, count);
                        }
                    });
    
            //4. map取出avg
            JavaPairRDD<Integer, Tuple3<Integer, Integer, Float>> rdd4 = rdd3.mapValues(new Function<Tuple4<Integer, Integer, Integer, Integer>, Tuple3<Integer, Integer, Float>>() {
                public Tuple3<Integer, Integer, Float> call(Tuple4<Integer, Integer, Integer, Integer> v1) throws Exception {
                    return new Tuple3<Integer, Integer, Float>(v1._1(), v1._2(), (float) v1._3() / v1._4());
                }
            });
    
            // 5. 排序
            JavaPairRDD<Integer, Tuple3<Integer, Integer, Float>> rdd5 = rdd4.sortByKey();
    
            // 6. 列表
            List<Tuple2<Integer, Tuple3<Integer, Integer, Float>>> list = rdd5.collect();
    
            for (Tuple2<Integer, Tuple3<Integer, Integer, Float>> t : list) {
                System.out.println(t);
            }
    
        }
    }

    且将新火试新茶,诗酒趁年华。
  • 相关阅读:
    jquery json 格式教程
    不修改代码就能优化ASP.NET网站性能的一些方法
    C#操作sqlite数据库使用SQLiteParameter传递参数
    60个开发者不容错过的免费资源库
    Java 与 .NET 的平台发展之争
    “一次编写,随处运行” Intel HTML5技术研讨会
    Struts2 高危漏洞修复方案 (S2-016/S2-017)
    AspNetPager 控件使用
    jQueryUI常用功能实战
    验证码生成类
  • 原文地址:https://www.cnblogs.com/share23/p/9757095.html
Copyright © 2011-2022 走看看