zoukankan      html  css  js  c++  java
  • 【Spark-core学习之二】 RDD和算子

    环境
      虚拟机:VMware 10
      Linux版本:CentOS-6.5-x86_64
      客户端:Xshell4
      FTP:Xftp4
      jdk1.8
      scala-2.10.4(依赖jdk1.8)
      spark-1.6


    一、RDD
    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
    RDD特性:
    (1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
    (2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
    (3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
    (4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
    (5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    创建RDD:

    二、Spark任务执行原理

    以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
     Driver与集群节点之间有频繁的通信。
     Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了,会造成oom。
     Worker是Standalone资源调度框架里面资源管理的从节点,也是JVM进程。
     Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

    三、Spark代码流程
    1. 创建SparkConf对象
     可以设置Application name。
     可以设置运行模式及资源需求。
    2. 创建SparkContext对象
    3. 基于Spark的上下文创建一个RDD,对RDD进行处理。
    4. 应用程序中要有Action类算子来触发Transformation类算子执行。
    5. 关闭Spark上下文对象SparkContext。

    四、Transformations转换算子
    Transformations 转换算子是延迟执行,也叫懒加载执行。
    filter:过滤符合条件的记录数,true保留,false过滤掉。
    map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。
    flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
    sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
    reduceByKey:将相同的Key根据相应的逻辑进行处理。
    sortByKey/sortBy:作用在K,V格式的RDD上,对key进行升序或者降序排序。

    JAVA示例: 

    package com.wjy.wc;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import com.google.common.base.Optional;
    
    import scala.Tuple2;
    
    public class TransFormation {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            //1、创建RDD 查看默认分区数,第二个参数指定分区数  未指定 默认1 分区数决定task数
            JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
            System.out.println("rdd默认分区数:"+rdd.partitions().size());
            
            //java创建K-V RDD使用parallelizePairs  不要使用parallelize
            List<Tuple2<String, String>> list1 = Arrays.asList(
                    new Tuple2<String,String>("zhangsan","a"),
                    new Tuple2<String,String>("lisi","b"),
                    new Tuple2<String,String>("wangwu","c"),
                    new Tuple2<String,String>("zhaoliu","d")
                    );
            List<Tuple2<String, Integer>> list2 = Arrays.asList(
                    new Tuple2<String,Integer>("zhangsan",1),
                    new Tuple2<String,Integer>("lisi",2),
                    new Tuple2<String,Integer>("wangwu",3),
                    new Tuple2<String,Integer>("niuer",4)
                    );
            List<Tuple2<String, String>> list3 = Arrays.asList(
                    new Tuple2<String,String>("zhangsan","100"),
                    new Tuple2<String,String>("lisi","200"),
                    new Tuple2<String,String>("wangwu","c"),
                    new Tuple2<String,String>("niuer","400")
                    );
            //返回是JavaPairRDD   不是JavaRDD
            JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(list1,2);
            JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(list2,3);
            JavaPairRDD<String, String> rdd3 = sc.parallelizePairs(list3,6);
            
            //2、join 必须作用在K-V格式RDD上 按照两个RDD的key去关联  只有相同的key才会被关联出来
            //    join后的分区数与父RDD分区数多的那一个相同
            JavaPairRDD<String, Tuple2<String, Integer>> join = rdd1.join(rdd2);
            join.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Integer>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Tuple2<String, Integer>> t) throws Exception {
                    System.out.println(t);
                }
            });
            
            //3、leftOuterJoin 左连接 以左边的key为主 左边的key都会出现  返回的仍然是K-V RDD
            JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftOuterJoin = rdd1.leftOuterJoin(rdd2);
            leftOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Optional<Integer>>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Tuple2<String, Optional<Integer>>> arg0) throws Exception {
                    System.out.println(arg0);
                    //Optional 两个子类:some和none
                    //of有值  absent无值
                    /*
                     * (zhangsan,(a,Optional.of(1)))
                        (wangwu,(c,Optional.of(3)))
                        (zhaoliu,(d,Optional.absent()))
                        (lisi,(b,Optional.of(2)))
                     */
                    String key = arg0._1;
                    String value1 = arg0._2._1;
                    Optional<Integer> optional = arg0._2._2;
                    if (optional.isPresent())//isPresent()判断是否有值 true有值  false无值
                    {
                        System.out.println("key="+key+",value1="+value1+",value2="+optional.get());
                    }
                    else
                    {
                        System.out.println("key="+key+",value1="+value1);
                    }
                    
                }
            });
            
            //4、rightOuterJoin右连接 以右边的key为主 右边的key都会出现  返回的仍然是K-V RDD
            JavaPairRDD<String, Tuple2<Optional<String>, Integer>> rightOuterJoin = rdd1.rightOuterJoin(rdd2);
            rightOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Integer>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Tuple2<Optional<String>, Integer>> arg0) throws Exception {
                    System.out.println(arg0);
                    //of代表有值  absent代表无值
                    /*
                     * (zhangsan,(Optional.of(a),1))   
                        (wangwu,(Optional.of(c),3))
                        (niuer,(Optional.absent(),4))
                        (lisi,(Optional.of(b),2))
                     */
                    String key = arg0._1;
                    Integer value2 = arg0._2._2;
                    Optional<String> optional = arg0._2._1;
                    if (optional.isPresent())//isPresent()判断是否有值 true有值  false无值
                    {
                        System.out.println("key="+key+",value1="+optional.get()+",value2="+value2);
                    }
                    else
                    {
                        System.out.println("key="+key+",value2="+value2);
                    }
                }
            });
            
            //5、fullOuterJoin 全连接  两边的key都会取到
            JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullOuterJoin = rdd1.fullOuterJoin(rdd2);
            fullOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Optional<Integer>>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>> arg0) throws Exception {
                    System.out.println(arg0);
                }
            });
            
            //6、union 合并两个数据集。两个数据集的类型要一致。            返回新的RDD的分区数是合并RDD分区数的总和
            JavaPairRDD<String, String> union = rdd1.union(rdd3);
            System.out.println("union分区数:"+union.partitions().size());
            union.foreach(new VoidFunction<Tuple2<String,String>>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, String> arg0) throws Exception {
                    System.out.println(arg0);
                }
            });
            
            //7、intersection     取两个数据集的交集  KV完全相同
            JavaPairRDD<String, String> intersection = rdd1.intersection(rdd3);
            intersection.foreach(new VoidFunction<Tuple2<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, String> arg0) throws Exception {
                    System.out.println(arg0);
                }
            });
            
            //8、subtract        取两个数据集的差集
            JavaPairRDD<String, String> subtract = rdd1.subtract(rdd3);
            subtract.foreach(new VoidFunction<Tuple2<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, String> arg0) throws Exception {
                    System.out.println(arg0);
                }
            });
    
            JavaPairRDD<String, String> rdd11 = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<String,String>("zhangsan","aaa"),
                    new Tuple2<String,String>("zhangsan","bbb"),
                    new Tuple2<String,String>("zhangsan","ccc"),
                    new Tuple2<String,String>("lisi","100"),
                    new Tuple2<String,String>("lisi","200"),
                    new Tuple2<String,String>("lisi","300"),
                    new Tuple2<String,String>("wangwu","c11"),
                    new Tuple2<String,String>("wangwu","c33"),
                    new Tuple2<String,String>("zhaoliu","d")
                    ));
            JavaPairRDD<String, String> rdd22 = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<String,String>("zhangsan","1"),
                    new Tuple2<String,String>("zhangsan","222"),
                    new Tuple2<String,String>("zhangsan","333"),
                    new Tuple2<String,String>("lisi","ddd"),
                    new Tuple2<String,String>("lisi","eee"),
                    new Tuple2<String,String>("lisi","ttt"),
                    new Tuple2<String,String>("niuer","ffff")
                    ));
    
            //9、cogroup当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
            //将两个rdd中相同key对应的value各自组成一个集合
            JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> cogroup = rdd11.cogroup(rdd22);
            cogroup.foreach(new VoidFunction<Tuple2<String,Tuple2<Iterable<String>,Iterable<String>>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> arg0) throws Exception {
                    System.out.println(arg0);
                    /*
                     * (zhangsan,([aaa, bbb, ccc],[1, 222, 333]))
                        (wangwu,([c11, c33],[]))
                        (niuer,([],[ffff]))
                        (zhaoliu,([d],[]))
                        (lisi,([100, 200, 300],[ddd, eee, ttt]))
                     */
                }
            });
            
            //10、mapPartition 与map类似,遍历的单位是每个partition上的数据 map是一个个处理。
            //类型    foreachPartition  遍历的是每个partition上的数据 foreach则是一个个遍历
            JavaRDD<String> rdd4 = sc.parallelize(Arrays.asList("a","b","c","d","e","f","g"),3);
            rdd4.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Iterable<String> call(Iterator<String> iter) throws Exception {
                    List<String> list = new ArrayList<String>();
                    System.out.println("begin");
                    while(iter.hasNext())
                    {
                        String s = iter.next();
                        System.out.println(s);
                        list.add(s);
                    }
                    System.out.println("end");
                    return list;
                }
            }).collect();
            
            rdd4.foreachPartition(new VoidFunction<Iterator<String>>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Iterator<String> iter) throws Exception {
                    System.out.println("开始");
                    while(iter.hasNext())
                    {
                        System.out.println(iter.next());
                    }
                    System.out.println("结束");
                }
            });
            //11、distinct(map+reduceByKey+map)  reduceByKey有分组功能
            JavaRDD<String> rdd5 = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
            JavaRDD<String> distinct = rdd5.distinct();
            distinct.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(String arg0) throws Exception {
                    System.out.println(arg0);
                }
            });
            
            //12、mapPartitionWithIndex     类似于mapPartitions,
            //会将RDD中的partition索引下标带出来,index 是每个partition的索引下标
            JavaRDD<String> rdd6 = sc.parallelize(Arrays.asList(
                    "love1","love2","love3","love4",
                    "love5","love6","love7","love8",
                    "love9","love10","love11","love12"
                ),3);
            
            //Function2  第一个参数是分区索引 第二个是这个分区值的list集合 第三个是经过call处理之后的返回值
            JavaRDD<String> mapPartitionsWithIndex = rdd6.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                //入参iter 是每个分区的集合的迭代器
                public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
                    List<String> list = new ArrayList<String>();
                    
                    while(iter.hasNext()) {
                        String one = iter.next();
                        list.add("rdd1 partition index = 【"+index+"】, value = 【"+one+"】");
                    }
                    return list.iterator();
                }
            }, true);
            mapPartitionsWithIndex.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String v) throws Exception {
                    System.out.println(v);
                }
            });;
            //13、repartition 增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)
            //repartition 是有shuffle的算子,可以对RDD重新分区。可以增加分区,也可以减少分区。
             //repartition = coalesce(numPartitions,true)
            JavaRDD<String> repartition = mapPartitionsWithIndex.repartition(2);
            repartition.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(String v) throws Exception {
                    System.out.println(v);
                }
            });
            
            //14、coalesce coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
            //coalesce 与repartition一样,可以对RDD进行分区,可以增多分区,也可以减少分区。
            //    coalsece(numPartitions,shuffle [Boolean = false]) false不产生shuffle  窄依赖  true就生成宽依赖  产生shuffle
            //spark有空分区的概念  但是往多了分区不会产生空分区
            //true为产生shuffle,false不产生shuffle。默认是false。
            //如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
            JavaRDD<String> coalesce = mapPartitionsWithIndex.coalesce(4, false);
            coalesce.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(String v) throws Exception {
                    System.out.println(v);
                }
            });
            
            //15、groupByKey 作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
            JavaPairRDD<String, Integer> rdd7 = sc.parallelizePairs(Arrays.asList(
            new Tuple2<String,Integer>("zhangsan",100),
            new Tuple2<String,Integer>("zhangsan",200),
            new Tuple2<String,Integer>("lisi",300),
            new Tuple2<String,Integer>("lisi",400),
            new Tuple2<String,Integer>("wangwu",500),
            new Tuple2<String,Integer>("wangwu",600)
            ),2);
            JavaPairRDD<String, Iterable<Integer>> groupByKey = rdd7.groupByKey();
            groupByKey.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Iterable<Integer>> v) throws Exception {
                    System.out.println(v);
                    /*
                     * (zhangsan,[100, 200])
                        (wangwu,[500, 600])
                        (lisi,[300, 400])
                     */
                }
            });
            //16、zip 将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
            JavaPairRDD<String, String> rdd8 = sc.parallelizePairs(Arrays.asList(
                    new Tuple2<String,String>("zhangsan","18"),
                    new Tuple2<String,String>("zhangsan","18"),
                    new Tuple2<String,String>("lisi","19"),
                    new Tuple2<String,String>("lisi","190"),
                    new Tuple2<String,String>("wangwu","100"),
                    new Tuple2<String,String>("wangwu","200")
                ),2);
            JavaPairRDD<Tuple2<String, Integer>, Tuple2<String, String>> zip = rdd7.zip(rdd8);
            zip.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Tuple2<String,String>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<Tuple2<String, Integer>, Tuple2<String, String>> v) throws Exception {
                    System.out.println(v);
                    /*
                     * ((zhangsan,100),(zhangsan,18))
                        ((zhangsan,200),(zhangsan,18))
                        ((lisi,300),(lisi,19))
                        ((lisi,400),(lisi,190))
                        ((wangwu,500),(wangwu,100))
                        ((wangwu,600),(wangwu,200))
                     */
                }
            });
            
            //17、zipWithIndex     该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
            JavaPairRDD<Tuple2<String, Integer>, Long> zipWithIndex = rdd7.zipWithIndex();
            zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Long>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<Tuple2<String, Integer>, Long> v) throws Exception {
                    System.out.println(v);
                    /*
                     * ((zhangsan,100),0)
                        ((zhangsan,200),1)
                        ((lisi,300),2)
                        ((lisi,400),3)
                        ((wangwu,500),4)
                        ((wangwu,600),5)
                     */
                }
            });
            
            //Action 算子
            //18、reduce  对RDD中的每个元素 使用传递的逻辑去处理
            JavaRDD<Integer> rdd9 = sc.parallelize(Arrays.asList(1,2,3,4,5));
            Integer reduce = rdd9.reduce(new Function2<Integer, Integer, Integer>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    //将相邻两个元素相加返回
                    return v1+v2;
                }
            });
            System.out.println(reduce);//15
            
            //19、countByKey() 对RDD中相同的key的元素计数
            Map<String, Object> countByKey = rdd8.countByKey();
            Set<Entry<String, Object>> entrySet = countByKey.entrySet();
            for(Entry<String, Object> entry :entrySet) {
                String key = entry.getKey();
                Object value = entry.getValue();
                System.out.println("key = "+key+",value ="+value);
            }
            
            //20、countByValue() 对RDD中相同的元素计数
            //根据数据集每个元素相同的内容来计数,不是针对K-V里的V,而是整个K-V。返回相同内容的元素对应的条数
            Map<Tuple2<String, String>, Long> countByValue = rdd8.countByValue();
            Set<Entry<Tuple2<String, String>, Long>> entrySet1 = countByValue.entrySet();
            for(Entry<Tuple2<String, String>, Long> entry :entrySet1) {
                Tuple2<String, String> key = entry.getKey();
                Long value = entry.getValue();
                System.out.println("key = "+key+",value ="+value);
                /*
                 * key = (lisi,19),value =1
                    key = (wangwu,100),value =1
                    key = (zhangsan,18),value =2
                    key = (lisi,190),value =1
                    key = (wangwu,200),value =1
                 */
            }
            
            //close()方法里调用了stop
            sc.close();
            //sc.stop();
        }
    }

    Scala示例:

    package com.wjy
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import scala.collection.mutable.ListBuffer
    
    object TransFormation {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
        val sc = new SparkContext(conf);
        //1、创建RDD 查看默认分区数,第二个参数指定分区数  未指定 默认1 分区数决定task数
        val rdd = sc.parallelize(Array("a","a","b","b","b","c","d"), 3);
        //length 或size
        println("分区数:"+rdd.partitions.size);
        //创建RDD方式2
        val rdd2 = sc.makeRDD(Array("a","b","b","b","c","d"), 2);
        //创建K-V格式RDD
        val rdd3 = sc.makeRDD(Array(("张三","v1"),("王五","v2"),("李四","v3"),("牛二","v4")));
        val rdd4 = sc.makeRDD(List(("张三","111"),("赵六","4444"),("李四","v3"),("王二","677")));
       //2、join
        rdd3.join(rdd4).foreach(println);
        /*
         * (张三,(v1,111))
                    (李四,(v3,4534))
         */
        
        //3、leftOuterJoin
       rdd3.leftOuterJoin(rdd4).foreach(println);
       /*
        * (张三,(v1,Some(111)))
                (李四,(v3,Some(4534)))
                (王五,(v2,None))
                (牛二,(v4,None))
        */
       
        //4、rightOuterJoin
       rdd3.rightOuterJoin(rdd4).foreach(println);
       /*
        * (张三,(Some(v1),111))
                (王二,(None,677))
                (李四,(Some(v3),4534))
                (赵六,(None,4444))
        */
       
       //5、fullOuterJoin
       rdd3.fullOuterJoin(rdd4).foreach(println);
       /*
        * (张三,(Some(v1),Some(111)))
                (王二,(None,Some(677)))
                (李四,(Some(v3),Some(4534)))
                (赵六,(None,Some(4444)))
                (王五,(Some(v2),None))
                (牛二,(Some(v4),None))
        */
       
       //6、union
       rdd3.union(rdd4).foreach(println);
        /*
         * (张三,v1)
    (王五,v2)
    (李四,v3)
    (牛二,v4)
    (张三,111)
    (赵六,4444)
    (李四,4534)
    (王二,677)
         */
       //7、交集intersection
       rdd3.intersection(rdd4).foreach(println);
       //8、差集subtract
       rdd3.subtract(rdd4).foreach(println);
       //9、cogroup
       rdd3.cogroup(rdd4).foreach(println);
       //10、mapPartition true表示与之前的分区器相同
       rdd3.mapPartitions(iter=>{iter}, true);  
       //11、distinct
       rdd3.distinct().foreach(println);
       //12、mapPartitionsWithIndex
       val rdd5 = sc.parallelize(Array(
                "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"
                ),3);
       rdd5.mapPartitionsWithIndex((index,iter)=>{
         val list=new ListBuffer[String]();
         while(iter.hasNext){
           list.+=("rdd1 partition index = "+index+",value = "+iter.next());
         }
         list.iterator;
         }, true);
       //13、repartition
       rdd5.repartition(4).foreach(println);
       //14、coalesce
       rdd5.coalesce(4, false).foreach(println);
       //15、groupByKey
       val rdd6 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"),(3,"d")),3);
       rdd6.groupByKey().foreach(println);
       //16、zipWithIndex
       rdd6.zipWithIndex().foreach(println);
       //17、zip
       val rdd8= sc.parallelize(Array((11,"aa"),(22,"bb"),(33,"cc"),(33,"dd")),3);
       rdd6.zip(rdd8).foreach(println);
       /*
        * ((1,a),(11,aa))
                ((2,b),(22,bb))
                ((3,c),(33,cc))
                ((3,d),(33,dd))
        */
       
       //Action 算子
       //18、countByKey
       rdd6.countByKey().foreach(println);
       //19、countByValue
       rdd6.countByValue().foreach(println);
       //20、reduce
       val rdd7=sc.parallelize(Array(1,2,3,4,5));
       val rev = rdd7.reduce((v1,v2)=>{
         v1+v2;
       });
       println(rev);//15
       
       sc.stop();
      }
    
    }

    五、Action行动算子
    Action类算子叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

    count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
    take(n):返回一个包含数据集前n个元素的集合。
    first:first=take(1),返回数据集中的第一个元素。
    foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
    collect:将计算结果回收到Driver端。

     

    六、控制算子
    控制算子有三种:cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。
    cache和persist都是懒执行的。必须有一个action类算子触发执行。
    checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

    1、cache:默认将RDD的数据持久化到内存中,cache是懒执行。

     SparkConf conf = new SparkConf();
     conf.setMaster("local").setAppName("CacheTest");
     JavaSparkContext jsc = new JavaSparkContext(conf);
     JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");
    
     lines = lines.cache();
     long startTime = System.currentTimeMillis();
     long count = lines.count();
     long endTime = System.currentTimeMillis();
     System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime));
            
     long countStartTime = System.currentTimeMillis();
     long countrResult = lines.count();
     long countEndTime = System.currentTimeMillis();
     System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-countStartTime));
            
     jsc.stop();

    2、persist:可以指定持久化的级别。最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。
    持久化级别如下:


    cache和persist的注意事项:
    (1)cache和persist都是懒执行,必须有一个action类算子触发执行。
    (2)cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
    (3)cache和persist算子后不能立即紧跟action算子。
    错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

    3、checkpoint
    checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
    checkpoint 的执行原理:
    (1)当RDD的job执行完毕后,会从finalRDD从后往前回溯。
    (2)当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
    (3)Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
    优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
    使用:

    SparkConf conf = new SparkConf();
     conf.setMaster("local").setAppName("checkpoint");
     JavaSparkContext sc = new JavaSparkContext(conf);
     sc.setCheckpointDir("./checkpoint");
     JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
     parallelize.checkpoint();
     parallelize.count();
     sc.stop();


    参考:
    Spark:https://www.cnblogs.com/qingyunzong/category/1202252.html

    Spark之RDD

  • 相关阅读:
    Android 使用WebView显示网页
    Android 使用ProgressBar实现进度条
    Android 使用Spinner实现下拉列表
    Android 使用GridView以表格的形式显示多张图片
    Android 使用DatePicker以及TimePicker显示当前日期和时间
    Android 使用ListView显示信息列表
    IIS配置步骤,绝对有用,百度上的不全面,是百度的补充
    冒烟测试
    广度优先和深度优先
    排序算法二(时间复杂度为O(N*logN))
  • 原文地址:https://www.cnblogs.com/cac2020/p/10637310.html
Copyright © 2011-2022 走看看