zoukankan      html  css  js  c++  java
  • spark记录(4)spark算子之Action

    Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

    (1)reduce

    reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。

    代码

        public static void reduce() {
            JavaRDD<String> rdd = jsc.textFile("words");
            String reduce = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).reduce(new Function2<String, String, String>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public String call(String a, String b) throws Exception {
                    return a+"-->"+b;
                }
            });
            System.out.println(reduce);
            
        }

    结果:

    (2)collect()

     将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。

    分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。

    详情请参考:https://blog.csdn.net/Fortuna_i/article/details/80851775

    代码:

        public static void collect() {
            JavaRDD<String> rdd = jsc.textFile("words");
            List<String> collect = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).collect();
            for (String string : collect) {
                System.out.println(string);
            }
        }

    结果:

    (3)take

    返回一个包含数据集前n个元素的数组(从0下标到n-1下标的元素),不排序。

    代码:

        public static void take() {
            JavaRDD<String> rdd = jsc.textFile("words");
            List<String> take = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).take(5);
            for (String string : take) {
                System.out.println(string);
            }
        }

    结果:

    (4)first

    返回数据集的第一个元素(底层即是take(1))

    代码:

        public static void first() {
            JavaRDD<String> rdd = jsc.textFile("words");
            String first = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).first();
            System.out.println(first);
    
        }

    结果:

    (5)takeSample(withReplacement, num, [seed])

    对于一个数据集进行随机抽样,返回一个包含num个随机抽样元素的数组,withReplacement表示是否有放回抽样,参数seed指定生成随机数的种子。

    该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver端的内存中。

    代码:

        public static void takeSample() {
            JavaRDD<String> rdd = jsc.textFile("words");
            List<String> takeSample = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).takeSample(false, 5, 1);
            for (String string : takeSample) {
                System.out.println(string);
            }
        }

    结果:

    (6)count

    返回数据集中元素个数,默认Long类型。

    代码:

        public static void count() {
            JavaRDD<String> rdd = jsc.textFile("words");
            long count = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).count();
            System.out.println(count);
        }

    结果:

     (7)takeOrdered(n,[ordering])

    返回RDD中前n个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。

    代码:

        public static void takeOrdered() {
            JavaRDD<String> rdd = jsc.textFile("words");
            List<String> takeSample = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).takeOrdered(4);
            for (String string : takeSample) {
                System.out.println(string);
            }
        }

    结果:

    (8)saveAsTextFile(path)

    将rdd中元素以文本文件的形式写入本地文件系统或者HDFS等。Spark将对每个元素调用toString方法,将数据元素转换为文本文件中的一行记录。

    若将文件保存到本地文件系统,那么只会保存在executor所在机器的本地目录。

    代码:

        public static void saveAsTextFile() {
            JavaRDD<String> rdd = jsc.textFile("words");
            rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).saveAsTextFile("C:/Users/Administrator/Documents/result/data.txt");
        }

    结果:

    (9) saveAsSequenceFile(path)

    将rdd中元素以Hadoop SequenceFile的形式写入本地文件系统或者HDFS等。(对pairRDD操作)

    java api中无该方法

    代码:

      def saveAsSequenceFile: Unit = {
        val rdd1 = sc.textFile("words", 2)
        rdd1.flatMap(_.split(" ")).map(new Tuple2(_,1))
          .saveAsSequenceFile("C:/Users/Administrator/Documents/result/data2")
      }

    结果:

    (10)saveAsObjectFile(path)(Java and Scala)

    将数据集中元素以ObjectFile形式写入本地文件系统或者HDFS等。

    代码:

        public static void saveAsObjectFile() {
            JavaRDD<String> rdd = jsc.textFile("words");
            rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).saveAsObjectFile("C:/Users/Administrator/Documents/result/data3");
        }

    结果:

    (11)countByKey()

    作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map<K,Object>

    代码:

        public static void countByKey() {
            JavaRDD<String> rdd = jsc.textFile("words");
            JavaPairRDD<String, Integer> mapToPair = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String a) throws Exception {
                    return Arrays.asList(a.split(" "));
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String a) throws Exception {
                    return new Tuple2<String, Integer>(a, 1);
                }
            });
            Map<String, Object> key = mapToPair.countByKey();
            for (Entry<String, Object> entry : key.entrySet()) {
                System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
            }
        }

    结果:

    (12)countByValue

    countByValue()函数与tuple元组中的(k,v)中的v 没有关系,这点要搞清楚,countByValue是针对Rdd中的每一个元素对象,

    而 countByKey 主要针对的事tuple(k,v)对象,并且与k 是有关系的,countByKey根据tuple(k,v)中的 k 进行统计的。使用的时候要区分。

    代码:

    public class Operator_countByValue {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("countByKey");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<Integer,String>(1,"a"),
                    new Tuple2<Integer,String>(2,"b"),
                    new Tuple2<Integer,String>(2,"c"),
                    new Tuple2<Integer,String>(3,"c"),
                    new Tuple2<Integer,String>(4,"d"),
                    new Tuple2<Integer,String>(4,"d")
            ));
            
            Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
            
            for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
                System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
            }
        }
    }

    结果:

    (13) foreach 、foreachPartition

    foreach不多说

    foreachPartition因为没有返回值并且是action操作,一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,或者hbase中,可以用它。

    public class Operator_foreachPartition {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("foreachPartition");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile("./words.txt",3);
            lines.foreachPartition(new VoidFunction<Iterator<String>>() {
    
                private static final long serialVersionUID = -2302401945670821407L;
    
                @Override
                public void call(Iterator<String> t) throws Exception {
                    System.out.println("创建数据库连接。。。");
                    while(t.hasNext()){
                        System.out.println(t.next());
                    }
                    
                }
            });
            
            sc.stop();
        }
    }
  • 相关阅读:
    如何快速修改替换对象中的某个属性?
    element组件 MessageBox不能显示确认和取消按钮,记录正确使用方法!
    记录一下vue transition 过渡各状态()
    记录一下vue slot
    vue路由传参query和params的区别(详解!)
    一段话让你理解vuex的工作模式!
    vue+axios访问本地json数据踩坑点
    怎么构建vue-cli项目
    IO模型
    epoll真正实现高并发服务器
  • 原文地址:https://www.cnblogs.com/kpsmile/p/10440486.html
Copyright © 2011-2022 走看看