zoukankan      html  css  js  c++  java
  • Apache Spark编程教程

    Apache Spark JavaRDD和任务解决

    好吧,我告诉我身边的每个人,如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们在城镇中有以下温度的文本文件:

    Prague 35
    Madrid 40
    Berlin 20
    Paris 15
    Rome 25

    位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城镇的行

    JavaRDD API和MapReduce有区别吗?

    实际上它不是!要解决前面提到的任务,我们需要将问题分成以下几部分:

    • 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
    • 然后我们要打印温度低于计算平均值的行。

    MapReduce解决方案概念

    如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:

    • Map函数将创建键[K,V] ='reducer',town.temperature
    • 减少功能将接收先前的键并将整个组的温度相加并计算平均温度。
    • 链式地图减少任务将打印温度低于平均值的城镇的结果。

    Apache Spark JavaRDD解决方案

     (类似于mapreduce)

    • 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
        JavaRDD<String> parsedTemperatures = lines.map(new Function<String, String>() {
            private static final long serialVersionUID = 1L;
    
            public String call(String v1) throws Exception {
                final String arr[] = SPACE.split(v1);
                System.out.println("Reading temperature ["+arr[1]+"] from "+v1);
                return arr[1];
            }
        });
    • 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
    JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair(
                new PairFunction<String, 
                String, Integer>() {
            private static final long serialVersionUID = 1L;
    
            public Tuple2<String, Integer> call(String t) throws Exception {
                return new Tuple2<String, Integer>("reducer", Integer.parseInt(t));
            }
        });
    • 有了这个数据集,我们就为减速器准备了数据,它将聚合所有温度
    JavaPairRDD<String, Integer> counts = forGroup.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
    
            public Integer call(Integer v1, Integer v2) throws Exception {
                  System.out.println("Agregatting "+v1+" plus "+v2);
                  return v1 + v2;
            }
        });

    (再次,像map-reduce概念)

    要了解Spark减速器的工作原理,请查看日志:

    Reading temperature [35] from Prague 35
    Reading temperature [40] from Madrid 40
    Agregatting 35 plus 40
    Reading temperature [20] from Berlin 20
    Agregatting 75 plus 20
    Reading temperature [15] from Paris 15
    Agregatting 95 plus 15
    Reading temperature [25] from Rome 25
    Agregatting 110 plus 25

    Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!

    解决方案的第二部分是打印温度低于平均温度的所有城镇:

        Tuple2<String, Integer> sumTemperatures = counts.first();    
        final Integer sum = sumTemperatures._2;
        final long count = parsedTemperatures.count();
        final double avg = (double) sum / count;
        System.out.println("Average temperature "+avg);
    
        JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;
    
            public Boolean call(String v1) throws Exception {
                final String arr[] = SPACE.split(v1);
                long temperature = Long.parseLong(arr[1]);
                return temperature <= avg;
            }
        });
    
        List<String> resultList = result.collect();
        for (String item: resultList) {
            System.out.println("Result item: "+item);
        }

    让我们解释一下这段代码:

    • 通过counts.first()我们从reducer中读取所有温度的总和
    • 我们使用count函数来获取JavaRDD输入集中所有行的计数。
    • 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
    • 我们使用JavaRDD collect函数来打印结果。

    如果你运行这个程序,你应该得到如下结果:

    16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s
    
    Average temperature 27.0
    .
    .
    Result item: Berlin 20
    Result item: Paris 15
    Result item: Rome 25
    
    16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}

    结论

    从我的观点来看,Apache Spark更加友好的地图减少编程,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?

  • 相关阅读:
    快速删除段落间多余的空行
    平时一些mysql小技巧及常识
    mysql中常用的控制流函数
    按年、季度、月分组&&计算日期和时间的函数
    Excel通过身份证获取出生年月,性别,年龄,生肖,星座,省份等信息总结归纳
    统计图表类型选择应用总结&表数据挖掘方法及应用
    EXCEL如何提取文字中包含的数字?
    一篇说尽Excel常见函数用法
    RStudio中,出现中文乱码问题的解决方案
    R-RMySQL包介绍学习
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305539.html
Copyright © 2011-2022 走看看