zoukankan      html  css  js  c++  java
  • spark基本RDD方法(Java版)

    spark基本RDD方法(Java版)

    一:Transformations

    map:输入和输出条数一致;

            JavaRDD<Integer> mappedRDD = lines.map(s -> s.length());

    filter:只保留返回true的数据;

            JavaRDD<String> mappedRDD = lines.filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String v1) throws Exception {
                    return v1.length()>=100;
                }
            });
    

    flatMap:输入输出为一比多,但是必须以迭代器的形式返回;

            JavaRDD<String> lines = sc.textFile("E:/test/input/2018-11/2018-11-07/null/test");
            JavaRDD<String> mappedRDD = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(StringUtil.STRING_26)).iterator();
                }
            });

    mapPartitions:和flatMap的区别:

    1-代码的差异:flatMap的传入数据是String,而mapPartitions的传入数据是Iterator<T>;

    2-功能上的差异:映射函数的参数由Rdd中的每一个元素变成了Rdd中每一个分区的迭代器。如果在映射过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多。

      例如:将数据插入数据库时,每次插入数据都需要创建connect,使用mapPartitions可以打批量的插入数据库和减少connect的创建和销毁的开销。

            JavaRDD<String> lines = sc.textFile("E:/test/input/2018-11/2018-11-07/null/test");
            JavaRDD<Integer> mappedRDD = lines.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
                @Override
                public Iterator<Integer> call(Iterator<String> stringIterator) throws Exception {
                    return Arrays.asList(stringIterator.forEachRemaining(s -> s.length())).iterator();
                }
            })
    

    mapPartitionsWithIndex:

    看下官方回答:

    (1)mapPartitions:Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

    (2)mapPartitionsWithIndex:Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

    mapPartitionsWithIndex和mapPartitions的差异:输入的格式不一样,会多个Integer类型的变量。会多个变量进行控制计算。

    具体区别可以参考:spark RDD算子(十二)之RDD 分区操作上mapPartitions, mapPartitionsWithIndex

    groupByKey([numPartitions]):When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.

            JavaRDD<String> lines = sc.textFile("E:/test/input/2018-11/2018-11-07/null/test");
            JavaRDD<Tuple2<String,Long>> mappedRDD = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Iterator<Tuple2<String, Long>> call(String s) throws Exception {
                    String[] strs = s.split(StringUtil.STRING_26);
                    List<Tuple2<String,Long>> statsList = new ArrayList<>();
                    for (String str: strs) {
                        statsList.add(new Tuple2<>(str,1L));
                    }
                    return statsList.iterator();
                }
            });
    
            mappedRDD.mapToPair(record -> new Tuple2<String, Long>(record._1, record._2)).groupByKey();
    

    reduceByKey:对相同的key进行累加计数;

            mappedRDD.mapToPair(record -> new Tuple2<String, Long>(record._1, record._2)).reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1+v2;
                }
            });

    distinct():去重;

    coalesce(Integer):输出文件的个数控制;

            mappedRDD.coalesce(1).distinct().saveAsTextFile("E:/test/output/04");
    

    sample(boolean withReplacement,double fraction) 或 sample(boolean withReplacement,double fraction,long seed):

    抽象方法,withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样比例;seed为随机数种子;

        JavaRDD<String> lines = sc.textFile("E:/test/input/2018-11/2018-11-07/null/test");
        JavaRDD<String> mappedRDD = lines.sample(false, 0.5,System.currentTimeMillis());

     union(两个集合的所有元素),intersection(两个集合元素的并集元素),subtract(集合1独有的元素),cartesian(集合1和集合2的笛卡尔积);

            List<Integer> datas1 = Arrays.asList(1, 2, 3, 4);
            JavaRDD<Integer> dataRDD1 = sc.parallelize(datas1);
            List<Integer> datas2 = Arrays.asList(3, 4, 5, 6);
            JavaRDD<Integer> dataRDD2 = sc.parallelize(datas2);
            dataRDD1.union(dataRDD2).foreach(s -> System.out.println(s+" "));  //1 2 3 4 3 4 5 6
            dataRDD1.intersection(dataRDD2).foreach(s -> System.out.println(s+" "));  //3 4
            dataRDD1.subtract(dataRDD2).foreach(s -> System.out.println(s+" "));      //1 2
            dataRDD1.cartesian(dataRDD2).foreach(s -> System.out.println(s+" "));     //(1,3) (1,4) (1,5) (1,6)等16项
    

    几种聚合的原理差别(性能aggregateByKey>reduceByKey>distinct>groupByKey):
        (1)groupByKey()是对RDD中的所有数据做shuffle,根据不同的Key映射到不同的partition中再进行aggregate

       (2)aggregateByKey()是先对每个partition中的数据根据不同的Key进行aggregate,然后将结果进行shuffle,完成各个partition之间的aggregate。因此,和groupByKey()相比,运算量小了很多。

          (3)  distinct()也是对RDD中的所有数据做shuffle进行aggregate后再去重。

        (4)reduceByKey()。也是先在单台机器中计算,再将结果进行shuffle,减小数据传输量

    额外的方法:

    1-sortByKey:按key进行排序,可选择升序和降序;

    2-join:两个RDD进行连接,分为leftOuterJoinrightOuterJoin, 和fullOuterJoin.;

                输入: (K, V) and (K, W),输出: (K, (V, W))

    3-cogroup:类似于join。输入: (K, V) and (K, W),输出:(K, (Iterable<V>, Iterable<W>)) 

    4-pipe(command):command是sh脚本的路径,功能是执行sh脚本。

    5-repartition:重新shuffle,是各个partition平衡,解决数据倾斜问题。

    6-repartitionAndSortWithinPartitions:类似于repartition,性能更好。

    二:Actions

    1-reduce():并行整合RDD中所有的数据;

    2-collect():返回RDD中所有的元素;

    3-count():RDD中元素的个数;

    4-first():返回Rdd中第一个元素;

    5-take(n):返回Rdd中的n个元素;

    6-takeSample():随机返回RDD中的元素;

    7-takeOrdered():排序后返回最前面的n个元素;

    8-saveAsTextFile():输出RDD的元素到text文件;

    9-saveAsSequenceFile():以序列化的形式保存数据到文件中;

    10-saveAsObjectFile():以对象的形式保存数据到文件中;

    11-countByKey():各元素在RDD中出现的次数;

    12-foreach():遍历Rdd中的每一个元素。

    参考文献:spark官方文档

    Spark中的groupByKey 、aggregateByKey、reduceByKey 的区别

     

  • 相关阅读:
    小点
    三.一些常用类
    字符串相关:String,StringBuffer,StringBuilder
    五.二叉树
    四.递归
    三.队列
    二.栈
    一.数组,链表
    RDLC 矩阵图片列表排列顺序乱
    RDLC 矩阵每隔一页就有空白页 矩阵 空白页
  • 原文地址:https://www.cnblogs.com/parent-absent-son/p/9936703.html
Copyright © 2011-2022 走看看