zoukankan      html  css  js  c++  java
  • Spark小课堂Week3 FirstSparkApp(RDD开发)

    Spark小课堂Week3 FirstSparkApp

    问题:Java有哪些数据结构

    大致有如下几种,其中List与Map是最重要的:

    • List
    • Map
    • Set
    • Array
    • Heap
    • Stack
    • Queue
    • Tree

    练习:构造一个1-5的List,把他们打印出来

    写法1

            List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
            for (int i = 0; i < input.size(); i++) {
                System.out.println(input.get(i));
            }
    

    这个写法存在的问题:

    1. 存在额外定义的变量i,需要对i进行逻辑控制(比如不能超过size)。
    2. 无法在循环内部如果对List进行安全更新,会产生逻辑问题。
    3. 这个写法非常的不推荐使用!!!

    写法2

            Iterator<Integer> it = input.iterator();
            while (it.hasNext()) {
                System.out.println("a" + it.next());
            }
    

    几点说明:

    1. iterator相当于oracle中游标,拥有数据只读和只能一次性读的特点。
    2. hasNext()和next)()是游标必备的方法,其中hasNext是判断是否有下一条,等同%Found,next是获取下一条数,等同fetch。
      这个写法存在的问题:
    3. 存在额外的对象it,读取方法比较简单,但是也需要记忆。

    写法3

            for (Integer i : input) {
                System.out.println(i);
            }
    

    这是对写法2的优化,逻辑完全等价,是一个语法糖。

    写法2和写法3共同存在的问题:

    1. 执行是串行的,对大数量的情况不适合。

    写法4

    input.parallelStream().forEach(
                    new Consumer<Integer>() {
                        @Override
                        public void accept(Integer a) {
                            System.out.println(a);
                        }
                    }
            );
    

    在这个写法中,执行是并行执行的,
    从语句特点看是将一个对象出入了forEach方法中,进行一下说明:

    1. 多线程程序,其实是几个互相独立运行的程序,产生了一个问题,我们需要将逻辑传递给给各自独立运行的程序
    2. 由于在Java中对象是全局存储的,所以是唯一可以在独立运行程序中传递的介质。
    3. 但是我们要的传递的是逻辑,比如要转变为对象才能传递,如果变成对象,要做两件事情:
      • 把逻辑,用方法来封装(示例中的apply方法)
      • 把方法,用class或者intereface来封装,更推荐intereface(示例中的Consumer)
        • 因为class既可以包含数据、又可以包含方法,而intereface中只包含方法,在我们的场景中,只需要传递方法
    4. 最后,接口提供方预先将intereface和method定义好,调用者按照模板使用即可。
      存在的问题:
    5. 语句比较长,逻辑也比较复杂

    写法5

            input.parallelStream().forEach(
                    a -> System.out.println(a)
            );
    

    这个写法和写法4逻辑没有任何区别,主要是用lambda表达式简化了对象的写法,是一个语法糖。
    写法4和写法5共同的问题是:
    当数据量进一步扩大,一台机器就算是多线程运行也无法完成的情况下,是无法处理的。

    小结

    在上述的几个写法中,主要是通过对于List的一系列增强,从而解决了一系列的问题。

    1. 写法2、3相比写法1,使用Iterator增强List,实现了数据的可更新。
    2. 写法4、5相比前面的写法,使用Stream增强Iterator,实现了数据的并发运算。
      但是,还是留下了在更大数据集情况下的处理。这个是时候,我们需要引入Spark。
      Spark中核心是RDD,是对Stream的进一步增强,在并发的基础上,增加了同时在多台机器上的分布式计算。解决了大数据集的问题。

    练习:RDD编程

    题目

    进行RDD操作的训练:

    1. 读取交易记录
    2. 按照SecurityId进行计数
    3. 根据计数,从高到低排序
    4. 输出结果

    练习过程:

    Step1:获取RDD数据,这个是调用公共方法,请注意的是,PracticePojo是预先准备好的测试数据名称。

    JavaRDD<PracticePojo> inputTradeRecords = this.getInputRDD(PracticePojo.class);
    

    Step2:将数据转为Key-Value格式

    因为在Spark处理中,reduce、sort、group等操作涉及到在分布式机器间的数据交互,数据必须要有Key来作为分布操作的依据,所以我们首先要将数据格式进行转换。

            JavaPairRDD<String, Integer> mappedTradeRecords = inputTradeRecords.mapToPair(
                    new PairFunction<PracticePojo, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(PracticePojo practicePojo) throws Exception {
                            return new Tuple2<String, Integer>(practicePojo.getSecurityId(), 1);
                        }
                    });
    

    Step3:执行计数操作。

    这里会用到reduceByKey方法,这里要注意的是,reduce是一个非常常用的操作,它有两个操作步骤:

    1. 对数据按照Key进行分组
    2. 对每组数据执行算子计算,需要注意reduce的计算必须满足交换律和结合律
      • 这个算子是指,当整个计算过程都满足交换律和结合律后,我们可以用任意两个数据之间的计算关系来定义整个计算关系
      • 比如:sum和count,可以用加法表示,max可以用a>b?a:b这样的计算来表示,min是反过来
            JavaPairRDD<String, Integer> reducedTradeRecords = mappedTradeRecords.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }
                    });
    

    Step4:执行排序

    这里有两步操作:

    1. 首先是使用mapToPair方法进行数据变形,因为sortByKey方法仅是针对key来排序,而我们原始数据的key并不是我们要的排序字段,所以首先需要将key和value换一下
    2. 用sortByKey执行排序操作,需要注意的是,参数是一个布尔值,默认为升序,false表示降序
         JavaPairRDD<Integer, String> reversedTradeRecords = reducedTradeRecords.mapToPair(
                    new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                        @Override
                        public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                            return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
                        }
                    });
    
            JavaPairRDD<Integer, String> sortedTradeRecords = reversedTradeRecords.sortByKey(false);
    

    Step5:输出结果

    一般我们都是输出一个JavaRDD,这里采用了预先定义的PracticeResultPojo结构,会将排序完的结果映射到这个结构上。

    JavaRDD<PracticeResultPojo> resultTradeRecords = sortedTradeRecords.map(
                    new Function<Tuple2<Integer, String>, PracticeResultPojo>() {
                        @Override
                        public PracticeResultPojo call(Tuple2<Integer, String> v1) throws Exception {
                            PracticeResultPojo resultPojo = new PracticeResultPojo();
                            resultPojo.setSecurityId(v1._2);
                            resultPojo.setCount(v1._1);
                            return resultPojo;
                        }
                    });
    

    小结

    RDD编程,难点如下:

    1. 采用对象方式来封装逻辑,和stream中处理方式一致,但是由于算子较多,每个算子要求不同,需要有一定练习来熟悉
    2. 输入输出为JavaRDD,但reduce、group、sort等操作需要中间使用mapToPair将JavaRDD转成JavaPairRDD才能操作,会涉及到多次转换
    3. reduce操作中,算子是更高层次的抽象,有一定的理解难度

    关于

    小课堂是在公司进行内部交流的一系列主题,偏基础,但是比较零散,持续更新中。

  • 相关阅读:
    mybatis源码追踪2——将结果集映射为map
    Mybatis的cache
    mybatis拦截器
    mybatis中单个参数的引用
    mybatis源码追踪1——Mapper方法用法解析
    win8 下 intellij idea 13 中文输入覆盖的问题
    firebug中html显示为灰色的原因总结
    extjs4.0以上添加多行工具栏的方法
    去除eclipse的validating
    An interview question from MicroStrategy
  • 原文地址:https://www.cnblogs.com/dt-zhw/p/5700726.html
Copyright © 2011-2022 走看看