zoukankan      html  css  js  c++  java
  • Spark练习之action操作开发

    一、reduce

    1.1 Java

     private static void reduce() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("reduce")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //使用reduce操作对集合中的数字进行累加
            //reduce操作的原理:
            //将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果
            //接着将该结果与下一个元素传入call()方法,进行计算
            //以此类推
            //reduce操作的本质:就是聚合,将多个元素聚合成一个元素
            int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            System.out.println(sum);
            //关闭JavaSparkContext
            sc.close();
        }
    

    1.2 Scala

    def reduce(): Unit = {
        val conf = new SparkConf().setAppName("reduce").setMaster("local")
        val sc = new SparkContext(conf)
        val numbersArray = Array(1, 2, 3, 4, 5, 6, 7, 8)
        val numberRDD = sc.parallelize(numbersArray, 1)
        val numbers = numberRDD.reduce(_ + _)
        println(numbers)
      }
    

    二、collect

    2.1 Java

    private static void collect() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("collect")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //使用map操作将集合中所有数字乘以2
            JavaRDD<Integer> doubleNumbers = numberRDD.map(new Function<Integer, Integer>() {
                @Override
                public Integer call(Integer v1) throws Exception {
                    return v1 * 2;
                }
            });
            //不用foreach action操作,在远程集群上遍历RDD中的元素
            //使用collect操作,将分布在远程集群上的doubleNumber RDD的数据拉取到本地
            //这种方式,一般不建议使用,因为如果RDD中的数据量笔记大,比如过万条
            //性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地
            //此外,还可能在RDD中数据量特别大的情况下,发生oom异常,内存溢出
            //因此,通常还是使用foreach action操作,来对最终的元素进行处理
            List<Integer> doubleNumberList = doubleNumbers.collect();
            for (Integer num : doubleNumberList) {
                System.out.println(num);
            }
            //关闭JavaSparkContext
            sc.close();
        }
    

    2.2 Scala

    def collect(): Unit = {
        val conf = new SparkConf().setAppName("collect").setMaster("local")
        val sc = new SparkContext(conf)
        val numbersArray = Array(1, 2, 3, 4, 5, 6, 7, 8)
        val numberRDD = sc.parallelize(numbersArray, 1)
        val numbers = numberRDD.map(num => num * 2)
        val doubleNumberArray = numbers.collect()
        for (num <- doubleNumberArray) {
          println(num)
        }
      }
    

    三、count

    3.1 Java

    private static void count() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("count")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //对RDD使用count操作,统计它有多少个元素
            long count = numberRDD.count();
            System.out.println(count);
            //关闭JavaSparkContext
            sc.close();
        }
    

    3.2 Scala

    def count(): Unit = {
        val conf = new SparkConf().setAppName("count").setMaster("local")
        val sc = new SparkContext(conf)
        val numbersArray = Array(1, 2, 3, 4, 5, 6, 7, 8)
        val numberRDD = sc.parallelize(numbersArray, 1)
        val count = numberRDD.count()
        println(count)
      }
    

    四、take

    4.1 Java

    private static void take() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("take")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //对RDD使用take操作
            //take与collect类似,从远程集群上,获取RDD数据
            //collect是获取RDD的所有数据,take知识获取前n个数据
            List<Integer> top3Numbers = numberRDD.take(3);
            for (Integer num : top3Numbers) {
                System.out.println(num);
            }
            //关闭JavaSparkContext
            sc.close();
        }
    

    4.2 Scala

    def take(): Unit = {
        val conf = new SparkConf().setAppName("take").setMaster("local")
        val sc = new SparkContext(conf)
        val numbersArray = Array(1, 2, 3, 4, 5, 6, 7, 8)
        val numberRDD = sc.parallelize(numbersArray, 1)
        val doubleNumberArray = numberRDD.take(3)
        for (num <- doubleNumberArray) {
          println(num)
        }
      }
    

    五、saveAsTextFile

    5.1 Java

    private static void saveAsTextFile() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("saveAsTextFile")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //使用map操作将集合中所有数字乘以2
            JavaRDD<Integer> doubleNumbers = numberRDD.map(new Function<Integer, Integer>() {
                @Override
                public Integer call(Integer v1) throws Exception {
                    return v1 * 2;
                }
            });
            //直接将RDD中的数据,保存在文件中
            doubleNumbers.saveAsTextFile("");
            //关闭JavaSparkContext
            sc.close();
        }
    

    六、countByKey

    6.1 Java

    private static void countByKey() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("countByKey")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Tuple2<String, String>> scoresList = Arrays.asList(
                    new Tuple2<>("class1", "tom"),
                    new Tuple2<>("class2", "jack"),
                    new Tuple2<>("class1", "leo"),
                    new Tuple2<>("class2", "marry"));
            //并行化集合,创建JavaPairRDD
            JavaPairRDD<String, String> students = sc.<String, String>parallelizePairs(scoresList);
            //对RDD应用countByKey操作,统计每个班级的学生人数,就是统计每个key对应的元素个数
            //countByKey返回的类型,直接就是Map<String,Object>
            Map<String, Long> studentCounts = students.countByKey();
            for (Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {
                System.out.println(studentCount.getKey() + ":" + studentCount.getValue());
            }
            //关闭JavaSparkContext
            sc.close();
        }
    

    6.2 Scala

    def countByKey(): Unit = {
        val conf = new SparkConf().setAppName("countByKey").setMaster("local")
        val sc = new SparkContext(conf)
        val studentList = Array(new Tuple2[String, String]("class1", "aaa"),
          new Tuple2[String, String]("class2", "mack"),
          new Tuple2[String, String]("class1", "tom"),
          new Tuple2[String, String]("class2", "pos"))
        val scores = sc.parallelize(studentList, 1)
        val students = scores.countByKey()
        println(students)
      }
    

    七、foreach

    八、main函数

    8.1 Java

    public static void main(String[] args) {
            //reduce();
            //collect();
            //count();
            //take();
            //saveAsTextFile();
            countByKey();
        }
    

    8.2 Scala

      def main(args: Array[String]): Unit = {
        //reduce()
        //collect()
        //count()
        //take()
        countByKey()
      }
    
  • 相关阅读:
    CSS之旅——第二站 如何更深入的理解各种选择器
    CSS之旅——第一站 为什么要用CSS
    记录一些在用wcf的过程中走过的泥巴路 【第一篇】
    asp.net mvc 之旅—— 第二站 窥探Controller下的各种Result
    asp.net mvc 之旅—— 第一站 从简单的razor入手
    Sql Server之旅——终点站 nolock引发的三级事件的一些思考
    Sql Server之旅——第十四站 深入的探讨锁机制
    Sql Server之旅——第十三站 对锁的初步认识
    Sql Server之旅——第十二站 sqltext的参数化处理
    Sql Server之旅——第十一站 简单说说sqlserver的执行计划
  • 原文地址:https://www.cnblogs.com/aixing/p/13327438.html
Copyright © 2011-2022 走看看