zoukankan      html  css  js  c++  java
  • 【Spark篇】---Spark中Action算子

    一、前述

    Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。

    二、具体

     原始数据集:

      1、count

    返回数据集中的元素数会在结果计算完成后回收到Driver端返回行数

     

    package com.spark.spark.actions;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    /**
     * count
     * 返回结果集中的元素数,会将结果回收到Driver端。
     *
     */
    public class Operator_count {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("collect");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            long count = lines.count();
            System.out.println(count);
            jsc.stop();
        }
    }

     

     结果:返回行数即元素数

    2、take(n)

           first=take(1) 返回数据集中的第一个元素

          返回一个包含数据集n个元素的集合。是一个array有几个partiotion 会有几个job触发

     

    package com.spark.spark.actions;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    /**
     * take
     * 
     * @author root
     *
     */
    public class Operator_takeAndFirst {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("take");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            
     
            JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("a","b","c","d"));
            List<String> take = parallelize.take(2);
            String first = parallelize.first();
            for(String s:take){
                System.out.println(s);
            }
            jsc.stop();
        }
    }

    结果:

    3、foreach

          循环遍历数据集中的每个元素,运行相应的逻辑。

    4、collect

         将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom.

     

         一般在使用过滤算子或者一些能返回少量数据集的算子后

     

    package com.spark.spark.actions;
    
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    /**
     * collect 
     * 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。
     *
     */
    public class Operator_collect {
        public static void main(String[] args) {
            /**
             * SparkConf对象中主要设置Spark运行的环境参数。
             * 1.运行模式
             * 2.设置Application name
             * 3.运行的资源需求
             */
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("collect");
            /**
             * JavaSparkContext对象是spark运行的上下文,是通往集群的唯一通道。
             */
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(String line) throws Exception {
                    return !line.contains("hadoop");
                }
                
            });
            List<String> collect = resultRDD.collect();
            for(String s :collect){
                System.out.println(s);
            }
            
            jsc.stop();
        }
    }

    结果:

    • countByKey

                  作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。也就是个数)

    java代码:

    package com.spark.spark.actions;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import scala.Tuple2;
    /**
     * countByKey
     * 
     * 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map<K,Object>
     * @author root
     *
     */
    public class Operator_countByKey {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("countByKey");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<Integer,String>(1,"a"),
                    new Tuple2<Integer,String>(2,"b"),
                    new Tuple2<Integer,String>(3,"c"),
                    new Tuple2<Integer,String>(4,"d"),
                    new Tuple2<Integer,String>(4,"e")
            ));
            
            Map<Integer, Object> countByKey = parallelizePairs.countByKey();
            for(Entry<Integer,Object>  entry : countByKey.entrySet()){
                System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
            }
            
            
        }
    }

    结果:

    • countByValue

               根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

    java代码:

     

    package com.spark.spark.actions;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import scala.Tuple2;
    /**
     * countByValue
     * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
     * 
     * @author root
     *
     */
    public class Operator_countByValue {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("countByKey");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<Integer,String>(1,"a"),
                    new Tuple2<Integer,String>(2,"b"),
                    new Tuple2<Integer,String>(2,"c"),
                    new Tuple2<Integer,String>(3,"c"),
                    new Tuple2<Integer,String>(4,"d"),
                    new Tuple2<Integer,String>(4,"d")
            ));
            
            Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
            
            for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
                System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
            }
        }
    }

     

     scala代码:

    package com.bjsxt.spark.actions
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    /**
     * countByValue
     * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
     */
    object Operator_countByValue {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("countByValue")
        val sc = new SparkContext(conf)
         val rdd1 = sc.makeRDD(List("a","a","b"))
        val rdd2 = rdd1.countByValue()
        rdd2.foreach(println)
        sc.stop()
      }
    }
    

     代码结果:

    java:

    scala:

    • reduce

                根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算

    java代码:

    package com.spark.spark.actions;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    /**
     * reduce
     * 
     * 根据聚合逻辑聚合数据集中的每个元素。
     * @author root
     *
     */
    public class Operator_reduce {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("reduce");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5));
            
            Integer reduceResult = parallelize.reduce(new Function2<Integer, Integer, Integer>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });
            System.out.println(reduceResult);
            sc.stop();
        }
    }

    scala代码:

    package com.bjsxt.spark.actions
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    /**
     * reduce
     * 
     * 根据聚合逻辑聚合数据集中的每个元素。
     */
    object Operator_reduce {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("reduce")
        
        val sc = new SparkContext(conf)
        val rdd1 = sc.makeRDD(Array(1,2))
        
        val result = rdd1.reduce(_+_)
        
        println(result)
        sc.stop()
      }
    }
    

     结果:

    java:

    scala:

     

  • 相关阅读:
    上手 WebRTC DTLS 遇到很多 BUG?浅谈 DTLS Fragment
    亮相 LiveVideoStackCon,透析阿里云窄带高清的现在与未来
    会议更流畅,表情更生动!视频生成编码 VS 国际最新 VVC 标准
    如何用 Electron + WebRTC 开发一个跨平台的视频会议应用
    理论 + 标准 + 工程 —— 阿里云视频云编码优化的思考与发现
    遍历哈希表
    sql语句的循环执行
    一个aspx页面里所有的控件
    SELECT DISTINCT 语句
    100 The 3n + 1 problem
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8401820.html
Copyright © 2011-2022 走看看