zoukankan      html  css  js  c++  java
  • spark

    SPARK核心编程
    一、spark基本工作原理与RDD
    1.Spark的基本工作原理
    1.分布式(RDD的partition)
    2.主要是基于内存(少数情况下数基于磁盘)
    3.迭代式计算(RDD->RDD->RDD)
    
    客户端(client):我们在本地编写了spark程序,然后必须在某台能够连接spark的机器上提交spark程序
    
    Spark程序被提交到spark集群上进行运算
    
    spark数据的来源可能是hadoop的hdfs或者hive上,将读取到的数据分散到集群的不同的节点的内存中,对节点上的数据进行操作处理,处理后的数据可能会移动到其他节点的内存中,可能是继续由其他的节点进行操作,所有的计算操作都是针对多个节点上的数据,进行并行计算操作的(因为一个RDD是分区的,每个计算都是在不同节点上并行计算)。
    
    Spark处理完的结果存储的位置可能是hadoop的hdfs、hive或者是MySQL等DB亦或是直接返回到客户端(运行spark程序的机器进程)
     
    Spark与mapreduce最大的不同就在于,迭代式计算模型:
    Mapreduce ,分为两个阶段,map和reduce,两个节点完了,就结束了,所以我们在一个job中能做得处理很有限,只能在map和reduce里处理
    Spark,计算模型可以分为n个阶段,因为他是内存迭代式的,我们在处理完一个阶段以后,可以继续往下处理很多的阶段,而不只是两个阶段,所以,spark相较于mapreduce来说,计算模型可以提供更强大的功能
    2.RDD以及其特点
    弹性:内存资源不足的情况下,自动将RDD存储到磁盘上,以及切换机制
    分布式:被分区的,分布到不同的节点上进行并行的计算
    数据集:数据的集合
    1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。
    2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
    3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合的并行化来创建。
    4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
    5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)
    
    一个RDD在逻辑上,抽象的代表一个HDFS文件。但是他实际上是被分区得。分为多个分区,多个分区散落在spark集群中的不同的节点上。比如RDD有90万数据,分为9个partition,9个分区
    
    现在,节点9出了故障,导致parttion 9的数据丢失,RDD具有很强的容错性,当他发现自己的数据丢失了,那么此时会自动从自己来源的数据进行重计算,重新获取自己这份数据,这一切对用户,都是完全透明的。
    
    RDD的每个partition,在spark节点上存储时,默认都是放在内存中的。但是如果说内存放不下这么多数据时,比如每个节点最多放5万数据,结果你每个partition是10万数据。那么就会把partition中的部分数据写入磁盘上,进行保存。
    
    而上述这一切,对于用户来说,都是完全透明的。也就是说,你不用去管RDD的数据存储在哪里,内存,还是磁盘。只要关注,你针对RDD来进行计算,和处理,等等操作即可。
    
    所以说,RDD的这种自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特点所在。
    3.spark核心编程
    Spark核心编程是什么?
    
    第一:
    定义初始的RDD,就是说你要定义第一个RDD是从哪里,读取数据,hdfs、linux本地文件、程序中的集合。
    
    第二:
    定义对RDD的计算操作,这个在spark中称之为算子,map 、reduce、flatMap、groupByKey等,比mapreduce强大太多
    
    第三:
    就是循环往复的过程,计算完成之后,数据就可能到了一批新的节点上,也就是变成了一个新的RDD,然后在再次反复,针对新的RDD定义计算操作
    
    第四;
    获取最终的数据,将数据保存起来
    
    每一个节点上的每一批数据,实际上就是一个RDD,一个RDD是分布式的,所以数据都散落在一批节点上了,每个节点都存储了RDD的部分partition
    
    4.spark开发
    1、核心开发:离线批处理 / 延迟性的交互式数据处理
    2、SQL查询:底层都是RDD和计算操作
    3、实时计算:底层都是RDD和计算操作
    
    二、使用java、Scala、spark-shell开发wordCount
    1、用Java开发wordcount程序
      1.1 配置maven环境
      1.2 如何进行本地测试
    【注意】
    本地测试的作用,先伪造一些数据,测试代码的正确性之后,提交到集群中,执行
    防止在集群中出错,同时减少解决问题的时间,因为数据量比实际的要少的多
      1.3 如何使用spark-submit提交到spark集群进行执行(spark-submit常用参数说明,spark-submit其实就类似于hadoop的hadoop jar命令)
    Pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.spark</groupId>
      <artifactId>SparkTest</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>SparkTest</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.3.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.3.0</version>
          </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.10</artifactId>
          <version>1.3.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.3.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.4.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.10</artifactId>
          <version>1.3.0</version>
        </dependency>
      </dependencies>
      
      <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/main/test</testSourceDirectory>
    
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                  <mainClass></mainClass>
                </manifest>
              </archive>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
    
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
              <execution>
                <goals>
                  <goal>exec</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <executable>java</executable>
              <includeProjectDependencies>true</includeProjectDependencies>
              <includePluginDependencies>false</includePluginDependencies>
              <classpathScope>compile</classpathScope>
              <mainClass>cn.spark.sparktest.App</mainClass>
            </configuration>
          </plugin>
    
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
              <source>1.6</source>
              <target>1.6</target>
            </configuration>
          </plugin>
    
        </plugins>
      </build>
    </project>
    
    Wordcountlocal
    package cn.spark.study.core;
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    /**
     * 使用java开发本地测试的wordcount程序
     * @author Administrator
     *
     */
    public class WordCountLocal {
        
        public static void main(String[] args) {
            // 编写Spark应用程序
            // 本地执行,是可以执行在eclipse中的main方法中,执行的
            
            // 第一步:创建SparkConf对象,设置Spark应用的配置信息
            // 使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
            // 但是如果设置为local则代表,在本地运行
            SparkConf conf = new SparkConf()
                    .setAppName("WordCountLocal")
                    .setMaster("local");  
            
            // 第二步:创建JavaSparkContext对象
            // 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写
                // 都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括
                // 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等
            // 一句话,SparkContext,是Spark应用中,可以说是最最重要的一个对象
            // 但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala,
                // 使用的就是原生的SparkContext对象
                // 但是如果使用Java,那么就是JavaSparkContext对象
                // 如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
                // 如果是开发Spark Streaming程序,那么就是它独有的SparkContext
                // 以此类推
            JavaSparkContext sc = new JavaSparkContext(conf);
        
            // 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD
            // 输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
            // 我们这里呢,因为是本地测试,所以呢,就是针对本地文件
            // SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法
            // 在Java中,创建的普通RDD,都叫做JavaRDD
            // 在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件呢,创建的RDD,每一个元素就相当于
            // 是文件里的一行
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
        
            // 第四步:对初始RDD进行transformation操作,也就是一些计算操作
            // 通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行
            // function,通常,如果比较简单,则创建指定Function的匿名内部类
            // 但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类    
            // 先将每一行拆分成单个的单词
            // FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型
            // 我们这里呢,输入肯定是String,因为是一行一行的文本,输出,其实也是String,因为是每一行的文本
            // 这里先简要介绍flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {    
                private static final long serialVersionUID = 1L;        
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));  
                }            
            });        
            // 接着,需要将每一个单词,映射为(单词, 1)的这种格式
                // 因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加
            // mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素
                // 如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值
            // mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型
                // 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型
            // JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型
            JavaPairRDD<String, Integer> pairs = words.mapToPair(            
                    new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = 1L;    
                        @Override
                        public Tuple2<String, Integer> call(String word) throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }                    
                    });    
            // 接着,需要以单词作为key,统计每个单词出现的次数
            // 这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作
            // 比如JavaPairRDD中有几个元素,分别为(hello, 1) (hello, 1) (hello, 1) (world, 1)
            // reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算
            // 比如这里的hello,那么就相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3
            // 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value
            // reduce之后的结果,相当于就是每个单词出现的次数
            JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(            
                    new Function2<Integer, Integer, Integer>() {                
                        private static final long serialVersionUID = 1L;    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }                    
                    });    
            // 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数
            // 但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作
            // 一个Spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action
            // 接着,最后,可以使用一种叫做action操作的,比如说,foreach,来触发程序的执行
            wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {        
                private static final long serialVersionUID = 1L;    
                @Override
                public void call(Tuple2<String, Integer> wordCount) throws Exception {
                    System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
                }        
            });    
            sc.close();
        }    
    }
    集群模式
    package cn.spark.study.core;
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    /**
     * 将java开发的wordcount程序部署到spark集群上运行
     * @author Administrator
     */
    public class WordCountCluster {
        public static void main(String[] args) {
            // 如果要在spark集群上运行,需要修改的,只有两个地方
            // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接
            // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件    
            // 实际执行步骤:
            // 1、将spark.txt文件上传到hdfs上去
            // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
            // 3、将打包后的spark工程jar包,上传到机器上执行
            // 4、编写spark-submit脚本
            // 5、执行spark-submit脚本,提交spark应用到集群执行        
            SparkConf conf = new SparkConf()
                    .setAppName("WordCountCluster");      
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");    
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                private static final long serialVersionUID = 1L;        
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));  
                }        
            });
            JavaPairRDD<String, Integer> pairs = words.mapToPair(                
                    new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = 1L;        
                        @Override
                        public Tuple2<String, Integer> call(String word) throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }                
                    });    
            JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(            
                    new Function2<Integer, Integer, Integer>() {                
                        private static final long serialVersionUID = 1L;    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }                    
                    });
            wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {            
                private static final long serialVersionUID = 1L;            
                @Override
                public void call(Tuple2<String, Integer> wordCount) throws Exception {
                    System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
                }            
            });        
            sc.close();
        }    
    }
    
    2、用Scala开发wordcount程序
      2.1 下载scala ide for eclipse
      2.2 在Java Build Path中,添加spark依赖包(如果与scala ide for eclipse原生的scala版本发生冲突,则移除原生的scala / 重新配置scala compiler)
      2.3 用export导出scala spark工程
    Scala_wordcount
    package cn.spark.study.core
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object WordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf()
            .setAppName("WordCount");
        val sc = new SparkContext(conf)
      
        val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); 
        val words = lines.flatMap { line => line.split(" ") }   
        val pairs = words.map { word => (word, 1) }   
        val wordCounts = pairs.reduceByKey { _ + _ }
        
    wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
    Sc.stop()
      } 
    }
    
    3、用spark-shell开发wordcount程序
      3.1 常用于简单的测试
    
    --master 不写表示的是local 本地模式
    --master spark://hadoop01:7077 表示的是spark自生的集群模式 standalone
    --master yarn-client
    --master yarn-cluster 表示spark应用运行在yarn上 
    
    
    三、spark的架构原理
    1.原理图
    
    2.详解
    组件:master worker executor task driver 
    1.我们在集群中的一个节点上或者自己提交spark程序的机器上提交一个spark程序后,会启动一个driver进程。
    2.Driver 进程启动后会做一些初始化的操作,在这个过程中,就会发送发送请求到master上,进行spark应用程序的注册,说白了就是让master知道,有一个新的spark应用程序要运行。
    3.Master是一个进行,主要是负责资源的调度和分配,还有集群的监控等职责。
    4.Worker是一个进行,主要是负责两个,一个是用自己的内存存储RDD的某个或者是某些partition,另外一个是启动其他的进程和线程,对RDD上的partition进行处理和计算
    5.Worker接收到master的请求之后,会为spark应用起动executor
    6.Executor和task其实就是负责执行,对RDD的partition进行并行的计算,也就是执行我们对RDD定义的,比如map flatMap reduce 等算子操作
    7. Executor启动之后,会向driver进行反注册,这样driver就知道哪些executor是为他进行服务的了。
    8.Driver注册了一些executor之后,就可以开始正式的执行我们的spark应用程序了。首先第一步就是创建RDD,读取数据源的文件
    9.HDFS上的文件内容被读取到多个work节点上,形成内存中的分布式的数据集,也就是初始RDD
    10.Driver会根据我们对RDD定义的操作,提交一大堆的task去executor上
    11.Executor接收到task之后,会启动多个线程来执行task.
    12.Task会对RDD的partition数据执行指定的算子操作,形成新的RDD的partition
    
    四、创建RDD
    创建的方式
         进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
    Spark core提供三种创建RDD的三种方式:
    1.使用程序中的集合创建RDD,并行化集合
    如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。
    
    调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10)
    // 案例:1到10累加求和
    val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val rdd = sc.parallelize(arr)
    val sum = rdd.reduce(_ + _)
    
    
    2.使用本地文件创建RDD
    Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
    
    有几个事项是需要注意的:
    1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。
    2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。
    3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。
    
    // 案例:文件字数统计
    val rdd = sc.textFile("data.txt")
    val wordCount = rdd.map(line => line.length).reduce(_ + _)
    
    3.使用HDFS文件创建RDD
    
    Spark的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特列的方法来创建RDD:
    
    1、SparkContext.wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回<filename, fileContent>组成的pair,作为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每个元素就是文件中的一行文本。
    2、SparkContext.sequenceFile[K, V]()方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。
    3、SparkContext.hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
    4、SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。
    
    注意;
    1.使用程序中的集合创建RDD,只要用于测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用流程
    2.使用本地文件创建RDD,主要是用于临时的处理一些存储大量的数据的文件
    3.使用HDFS文件创建RDD,应该是最常用的生产环境的处理方式,只要可以针对HDFS上存储的大数据,进行离线的批处理操作
    Spark core主要是用来代替mapreduce的批处理操作
    五、RDD的操作
    RDD的两种操作
    Spark支持两种RDD操作:transformation和action。transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。
    
    例如,map就是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。
    
    transformation的特点就是lazy特性。lazy特性指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。
    
    action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。
    
    案例一:统计文件的字数
    这里通过一个之前学习过的案例,统计文件字数,来讲解transformation和action。
    
    // 这里通过textFile()方法,针对外部文件创建了一个RDD,lines,但是实际上,程序执行到这里为止,spark.txt文件的数据是不会加载到内存中的。lines,只是代表了一个指向spark.txt文件的引用。
    val lines = sc.textFile("spark.txt")
    
    // 这里对lines RDD进行了map算子,获取了一个转换后的lineLengths RDD。但是这里连数据都没有,当然也不会做任何操作。lineLengths RDD也只是一个概念上的东西而已。
    val lineLengths = lines.map(line => line.length)
    
    // 之列,执行了一个action操作,reduce。此时就会触发之前所有transformation操作的执行,Spark会将操作拆分成多个task到多个机器上并行执行,每个task会在本地执行map操作,并且进行本地的reduce聚合。最后会进行一个全局的reduce聚合,然后将结果返回给Driver程序。
    val totalLength = lineLengths.reduce(_ + _)
    
    案例二:统计文件每行出现的次数
    Spark有些特殊的算子,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其实只是针对特殊的RDD的。即包含key-value对的RDD。而这种RDD中的元素,实际上是scala中的一种类型,即Tuple2,也就是包含两个值的Tuple。
    
    在scala中,需要手动导入Spark的相关隐式转换,import org.apache.spark.SparkContext._。然后,对应包含Tuple2的RDD,会自动隐式转换为PairRDDFunction,并提供reduceByKey等方法。
    
    val lines = sc.textFile("hello.txt")
    val linePairs = lines.map(line => (line, 1))
    val lineCounts = linePairs.reduceByKey(_ + _)
    lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + llineCount._2 + " times."))
    
    2.常见的Transformation操作
    操作    介绍
    map    将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
    filter    对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
    flatMap    与map类似,但是对每个元素都可以返回一个或多个新元素。
    gropuByKey    根据key进行分组,每个key对应一个Iterable<value>
    reduceByKey    对每个key对应的value进行reduce操作。
    sortByKey    对每个key对应的value进行排序操作。
    join    对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。
    cogroup    同join,但是是每个key对应的Iterable<value>都会传入自定义函数进行处理。
    
    1.Map
    Java:
    /**
         * map算子案例:将集合中每一个元素都乘以2
         */
        private static void map() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("map")
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            // 并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            
            // 使用map算子,将集合中的每个元素都乘以2
            // map算子,是对任何类型的RDD,都可以调用的
            // 在java中,map算子接收的参数是Function对象
            // 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型
                // 同时call()方法的返回类型,也必须与第二个泛型类型同步
            // 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素
            // 所有新的元素就会组成一个新的RDD
            JavaRDD<Integer> multipleNumberRDD = numberRDD.map(
                    
                    new Function<Integer, Integer>() {
    
                        private static final long serialVersionUID = 1L;
                        
                        // 传入call()方法的,就是1,2,3,4,5
                        // 返回的就是2,4,6,8,10
                        @Override
                        public Integer call(Integer v1) throws Exception {
                            return v1 * 2;
                        }
                    });
            // 打印新的RDD
            multipleNumberRDD.foreach(new VoidFunction<Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Integer t) throws Exception {
                    System.out.println(t);  
                }
            });
            // 关闭JavaSparkContext
            sc.close();
        }
    Scala
     def map() {
        val conf = new SparkConf()
            .setAppName("map")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val numbers = Array(1, 2, 3, 4, 5)
        val numberRDD = sc.parallelize(numbers, 1)  
        val multipleNumberRDD = numberRDD.map { num => num * 2 }  
        
        multipleNumberRDD.foreach { num => println(num) }   
      }
    
    2.Filter
    Java 
    /**
         * filter算子案例:过滤集合中的偶数
         */
        private static void filter() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("filter")
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);    
            // 模拟集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            // 并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            
            // 对初始RDD执行filter算子,过滤出其中的偶数
            // filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的
            // 但是,唯一的不同,就是call()方法的返回类型是Boolean
            // 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑
            // 来判断这个元素是否是你想要的
            // 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回false
            JavaRDD<Integer> evenNumberRDD = numberRDD.filter(                
                    new Function<Integer, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        
                        // 在这里,1到10,都会传入进来
                        // 但是根据我们的逻辑,只有2,4,6,8,10这几个偶数,会返回true
                        // 所以,只有偶数会保留下来,放在新的RDD中
                        @Override
                        public Boolean call(Integer v1) throws Exception {
                            return v1 % 2 == 0;
                        }    
                    });
            // 打印新的RDD
            evenNumberRDD.foreach(new VoidFunction<Integer>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Integer t) throws Exception {
                    System.out.println(t);
                }        
            });    
            // 关闭JavaSparkContext
            sc.close();
        }
    Scala
      def filter() {
        val conf = new SparkConf()
            .setAppName("filter")
            .setMaster("local")
        val sc = new SparkContext(conf)
        
        val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numberRDD = sc.parallelize(numbers, 1)
        val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }
        
        evenNumberRDD.foreach { num => println(num) }   
      }
    
    3.flatMap
    java
    /**
         * flatMap案例:将文本行拆分为多个单词
         */
        private static void flatMap() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("flatMap")  
                    .setMaster("local");  
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 构造集合
            List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
            
            // 并行化集合,创建RDD
            JavaRDD<String> lines = sc.parallelize(lineList);
            
            // 对RDD执行flatMap算子,将每一行文本,拆分为多个单词
            // flatMap算子,在java中,接收的参数是FlatMapFunction
            // 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型
            // call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同
            // flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素
            // 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合
            // 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 >= 原始RDD的大小
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
                
                // 在这里会,比如,传入第一行,hello you
                // 返回的是一个Iterable<String>(hello, you)
                @Override
                public Iterable<String> call(String t) throws Exception {
                    return Arrays.asList(t.split(" "));
                }
            });
            // 打印新的RDD
            words.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(String t) throws Exception {
                    System.out.println(t);
                }
            });        
            // 关闭JavaSparkContext
            sc.close();
        }
    Scala
     def flatMap() {
        val conf = new SparkConf()
            .setAppName("flatMap")  
            .setMaster("local")  
        val sc = new SparkContext(conf) 
        
        val lineArray = Array("hello you", "hello me", "hello world")  
        val lines = sc.parallelize(lineArray, 1)
        val words = lines.flatMap { line => line.split(" ") }   
          
        words.foreach { word => println(word) }
      }
    
    4.groupByKey
    java
    /**
         * groupByKey案例:按照班级对成绩进行分组
         */
        private static void groupByKey() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("groupByKey")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);        
            // 模拟集合
            List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                    new Tuple2<String, Integer>("class1", 80),
                    new Tuple2<String, Integer>("class2", 75),
                    new Tuple2<String, Integer>("class1", 90),
                    new Tuple2<String, Integer>("class2", 65));        
            // 并行化集合,创建JavaPairRDD
            JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);        
            // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组
            // groupByKey算子,返回的还是JavaPairRDD
            // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型
            // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable
            // 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据
            JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();        
            // 打印groupedScores RDD
            groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Iterable<Integer>> t)
                        throws Exception {
                    System.out.println("class: " + t._1);  
                    Iterator<Integer> ite = t._2.iterator();
                    while(ite.hasNext()) {
                        System.out.println(ite.next());  
                    }
                    System.out.println("==============================");   
                }        
            });
            // 关闭JavaSparkContext
            sc.close();
        }
    
    Scala
     def groupByKey() {
        val conf = new SparkConf()
            .setAppName("groupByKey")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
            Tuple2("class1", 90), Tuple2("class2", 60))
        val scores = sc.parallelize(scoreList, 1)  
        val groupedScores = scores.groupByKey() 
        
        groupedScores.foreach(score => { 
          println(score._1); 
          score._2.foreach { singleScore => println(singleScore) };
          println("=============================")  
        })
      }
    
    5.reduceByKey
    java
    /**
         * reduceByKey案例:统计每个班级的总分
         */
        private static void reduceByKey() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("reduceByKey")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);    
            // 模拟集合
            List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                    new Tuple2<String, Integer>("class1", 80),
                    new Tuple2<String, Integer>("class2", 75),
                    new Tuple2<String, Integer>("class1", 90),
                    new Tuple2<String, Integer>("class2", 65));        
            // 并行化集合,创建JavaPairRDD
            JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);    
            // 针对scores RDD,执行reduceByKey算子
            // reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值
            // 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
                // 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
                // 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
            // 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
            // reduceByKey算法返回的RDD,还是JavaPairRDD<key, value>
            JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(            
                    new Function2<Integer, Integer, Integer>() {                
                        private static final long serialVersionUID = 1L;
                        
                        // 对每个key,都会将其value,依次传入call方法
                        // 从而聚合出每个key对应的一个value
                        // 然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }                
                    });    
            // 打印totalScores RDD
            totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t._1 + ": " + t._2);   
                }        
            });        
            // 关闭JavaSparkContext
            sc.close();
        }
    
    Scala
    def reduceByKey() {
        val conf = new SparkConf()
            .setAppName("groupByKey")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
            Tuple2("class1", 90), Tuple2("class2", 60))
        val scores = sc.parallelize(scoreList, 1)  
        val totalScores = scores.reduceByKey(_ + _)  
        
        totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))  
      }
    
    6.sortByKey
    java
    /**
         * sortByKey案例:按照学生分数进行排序
         */
        private static void sortByKey() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("sortByKey")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 模拟集合
            List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                    new Tuple2<Integer, String>(65, "leo"),
                    new Tuple2<Integer, String>(50, "tom"),
                    new Tuple2<Integer, String>(100, "marry"),
                    new Tuple2<Integer, String>(80, "jack"));
            // 并行化集合,创建RDD
            JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
            
            // 对scores RDD执行sortByKey算子
            // sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序
            // 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的
            // 但是就是RDD中的元素的顺序,不同了
            JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);      
            // 打印sortedScored RDD
            sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<Integer, String> t) throws Exception {
                    System.out.println(t._1 + ": " + t._2);  
                }            
            });        
            // 关闭JavaSparkContext
            sc.close();
        }
    
    Scala
     def sortByKey() {
        val conf = new SparkConf()
            .setAppName("sortByKey")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"), 
            Tuple2(100, "marry"), Tuple2(85, "jack"))  
        val scores = sc.parallelize(scoreList, 1)  
        val sortedScores = scores.sortByKey(false)
        
        sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))  
      }
    
    7.Join
    java
    /**
         * join案例:打印学生成绩
         */
        private static void join() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("join")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 模拟集合
            List<Tuple2<Integer, String>> studentList = Arrays.asList(
                    new Tuple2<Integer, String>(1, "leo"),
                    new Tuple2<Integer, String>(2, "jack"),
                    new Tuple2<Integer, String>(3, "tom"));
            
            List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                    new Tuple2<Integer, Integer>(1, 100),
                    new Tuple2<Integer, Integer>(2, 90),
                    new Tuple2<Integer, Integer>(3, 60));
            
            // 并行化两个RDD
            JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
            JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
            
            // 使用join算子关联两个RDD
            // join以后,还是会根据key进行join,并返回JavaPairRDD
            // 但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key的类型,因为是通过key进行join的
            // 第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型
            // join,就返回的RDD的每一个元素,就是通过key join上的一个pair
            // 什么意思呢?比如有(1, 1) (1, 2) (1, 3)的一个RDD
                // 还有一个(1, 4) (2, 1) (2, 2)的一个RDD
                // join以后,实际上会得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
            JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
            
            // 打印studnetScores RDD
            studentScores.foreach(
                    
                    new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
    
                        private static final long serialVersionUID = 1L;
            
                        @Override
                        public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                                throws Exception {
                            System.out.println("student id: " + t._1);  
                            System.out.println("student name: " + t._2._1);  
                            System.out.println("student score: " + t._2._2);
                            System.out.println("===============================");   
                        }                
                    });    
            // 关闭JavaSparkContext
            sc.close();
        }
    
    Scala
     def join() {
        val conf = new SparkConf()
            .setAppName("join")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
       val studentList = Array(
            Tuple2(1, "leo"),
            Tuple2(2, "jack"),
            Tuple2(3, "tom"));
        
       val scoreList = Array(
            Tuple2(1, 100),
            Tuple2(2, 90),
            Tuple2(3, 60));
        
        val students = sc.parallelize(studentList);
        val scores = sc.parallelize(scoreList);
        
        val studentScores = students.join(scores)  
        
        studentScores.foreach(studentScore => { 
          println("student id: " + studentScore._1);
          println("student name: " + studentScore._2._1)
          println("student socre: " + studentScore._2._2)  
          println("=======================================")  
        })  
      }
    
    8.Cogroup
    /**
         * cogroup案例:打印学生成绩
         */
        private static void cogroup() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("cogroup")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);        
            // 模拟集合
            List<Tuple2<Integer, String>> studentList = Arrays.asList(
                    new Tuple2<Integer, String>(1, "leo"),
                    new Tuple2<Integer, String>(2, "jack"),
                    new Tuple2<Integer, String>(3, "tom"));
            List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                    new Tuple2<Integer, Integer>(1, 100),
                    new Tuple2<Integer, Integer>(2, 90),
                    new Tuple2<Integer, Integer>(3, 60),
                    new Tuple2<Integer, Integer>(1, 70),
                    new Tuple2<Integer, Integer>(2, 80),
                    new Tuple2<Integer, Integer>(3, 50));
            
            // 并行化两个RDD
            JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
            JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);    
            // cogroup与join不同
            // 相当于是,一个key join上的所有value,都给放到一个Iterable里面去了 
            // cogroup,不太好讲解,希望大家通过动手编写我们的案例,仔细体会其中的奥妙
            JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = 
                    students.cogroup(scores);    
            // 打印studnetScores RDD
            studentScores.foreach(            
                    new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
                        private static final long serialVersionUID = 1L;
            
                        @Override
                        public void call(
                                Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
                                throws Exception {
                            System.out.println("student id: " + t._1);  
                            System.out.println("student name: " + t._2._1);  
                            System.out.println("student score: " + t._2._2);
                            System.out.println("===============================");   
                        }        
                    });        
            // 关闭JavaSparkContext
            sc.close();
        }
    
    Scala
     def join() {
        val conf = new SparkConf()
            .setAppName("join")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
       val studentList = Array(
            Tuple2(1, "leo"),
            Tuple2(2, "jack"),
            Tuple2(3, "tom"));
        
       val scoreList = Array(
            Tuple2(1, 100),
            Tuple2(2, 90),
            Tuple2(3, 60));
        
        val students = sc.parallelize(studentList);
        val scores = sc.parallelize(scoreList);
        
        val studentScores = students.join(scores)  
        
        studentScores.foreach(studentScore => { 
          println("student id: " + studentScore._1);
          println("student name: " + studentScore._2._1)
          println("student socre: " + studentScore._2._2)  
          println("=======================================")  
        })  
      }
    
    3.常见的action操作
    操作    介绍
    reduce    将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
    collect    将RDD中所有元素获取到本地客户端。
    count    获取RDD元素总数。
    take(n)    获取RDD中前n个元素。
    saveAsTextFile    将RDD元素保存到文件中,对每个元素调用toString方法
    countByKey    对每个key对应的值进行count计数。
    foreach    遍历RDD中的每个元素。
    
    1.Reduce
    Java
    private static void reduce() {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("reduce")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
            List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numbers = sc.parallelize(numberList);
            
            // 使用reduce操作对集合中的数字进行累加
            // reduce操作的原理:
                // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3
                // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6
                // 以此类推
            // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素
            int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
                
            });
            
            System.out.println(sum);  
            
            // 关闭JavaSparkContext
            sc.close();
        }
        
    
    Scala
      def reduce() {
        val conf = new SparkConf()
            .setAppName("reduce")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numbers = sc.parallelize(numberArray, 1)  
        val sum = numbers.reduce(_ + _)  
        
        println(sum)  
      }
      
    
    2.Collect
    Java
    
        private static void collect() {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("collect")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
            List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numbers = sc.parallelize(numberList);
            
            // 使用map操作将集合中所有数字乘以2
            JavaRDD<Integer> doubleNumbers = numbers.map(
                    
                    new Function<Integer, Integer>() {
    
                        private static final long serialVersionUID = 1L;
            
                        @Override
                        public Integer call(Integer v1) throws Exception {
                            return v1 * 2;
                        }
                        
                    });
            
            // 不用foreach action操作,在远程集群上遍历rdd中的元素
            // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地
            // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条
                // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地
                // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出
            // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理
            List<Integer> doubleNumberList = doubleNumbers.collect();
            for(Integer num : doubleNumberList) {
                System.out.println(num);  
            }
            
            // 关闭JavaSparkContext
            sc.close();
        }
        
    
    scala
    
      def collect() {
        val conf = new SparkConf()
            .setAppName("collect")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numbers = sc.parallelize(numberArray, 1)  
        val doubleNumbers = numbers.map { num => num * 2 }  
        
        val doubleNumberArray = doubleNumbers.collect()
        
        for(num <- doubleNumberArray) {
          println(num)  
        }
      }
    
    3.Count
    Java
    private static void count() {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("count")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
            List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numbers = sc.parallelize(numberList);
            
            // 对rdd使用count操作,统计它有多少个元素
            long count = numbers.count();
            System.out.println(count);  
            
            // 关闭JavaSparkContext
            sc.close();
        }
    
    scala
      def count() {
        val conf = new SparkConf()
            .setAppName("count")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numbers = sc.parallelize(numberArray, 1)  
        val count = numbers.count()
        
        println(count)  
      }
    
    4.Take(n)
    Java
    
        private static void take() {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("take")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
            List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numbers = sc.parallelize(numberList);
            
            // 对rdd使用count操作,统计它有多少个元素
            // take操作,与collect类似,也是从远程集群上,获取rdd的数据
            // 但是collect是获取rdd的所有数据,take只是获取前n个数据
            List<Integer> top3Numbers = numbers.take(3);
            
            for(Integer num : top3Numbers) {
                System.out.println(num);  
            }
            
            // 关闭JavaSparkContext
            sc.close();
        }
    
    scala
      
      def take() {
        val conf = new SparkConf()
            .setAppName("take")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numbers = sc.parallelize(numberArray, 1)  
        
        val top3Numbers = numbers.take(3)
        
        for(num <- top3Numbers) {
          println(num)  
        }
      }
    
    5.saveAsTextFile
    Java
    private static void saveAsTextFile() {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("saveAsTextFile");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
            List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numbers = sc.parallelize(numberList);
            
            // 使用map操作将集合中所有数字乘以2
            JavaRDD<Integer> doubleNumbers = numbers.map(
                    
                    new Function<Integer, Integer>() {
    
                        private static final long serialVersionUID = 1L;
            
                        @Override
                        public Integer call(Integer v1) throws Exception {
                            return v1 * 2;
                        }
                        
                    });
            
            // 直接将rdd中的数据,保存在HFDS文件中
            // 但是要注意,我们这里只能指定文件夹,也就是目录
            // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件
            doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");   
            
            // 关闭JavaSparkContext
            sc.close();
        }
        
    
    scala
      
      def saveAsTextFile() {
        
      }
      
     
    
    6.countByKey
    Java
        private static void countByKey() {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("countByKey")  
                    .setMaster("local");
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 模拟集合
            List<Tuple2<String, String>> scoreList = Arrays.asList(
                    new Tuple2<String, String>("class1", "leo"),
                    new Tuple2<String, String>("class2", "jack"),
                    new Tuple2<String, String>("class1", "marry"),
                    new Tuple2<String, String>("class2", "tom"),
                    new Tuple2<String, String>("class2", "david"));  
            
            // 并行化集合,创建JavaPairRDD
            JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
            
            // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数
            // 这就是countByKey的作用
            // countByKey返回的类型,直接就是Map<String, Object>
            Map<String, Object> studentCounts = students.countByKey();
            
            for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
                System.out.println(studentCount.getKey() + ": " + studentCount.getValue());  
            }
            
            // 关闭JavaSparkContext
            sc.close();
        }
    
    scala
     def countByKey() {
        val conf = new SparkConf()
            .setAppName("countByKey")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),
            Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))   
        val students = sc.parallelize(studentList, 1)  
        val studentCounts = students.countByKey()  
        
        println(studentCounts)  
      }
    }
    
    7.Foreach
    六、RDD持久化
    1.持久化原理
         Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
    
    巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
    
    要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
    
    cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。
    
    Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。
    
    实验检测
    public class Persist {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("Persist")
                    .setMaster("local"); 
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // cache()或者persist()的使用,是有规则的
            // 必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以
            // 如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的
            // 而且,会报错,大量的文件会丢失
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt")
                    .cache();
            
            long beginTime = System.currentTimeMillis();
            
            long count = lines.count();
            System.out.println(count);  
            
            long endTime = System.currentTimeMillis();
            System.out.println("cost " + (endTime - beginTime) + " milliseconds.");   
            
            beginTime = System.currentTimeMillis();
            
            count = lines.count();
            System.out.println(count);  
            
            endTime = System.currentTimeMillis();
            System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
            
            sc.close();
        }
        
    }
    
    2.持久化策略
    RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorageLevel即可。
    
    
    持久化级别    含义
    MEMORY_ONLY    以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。
    MEMORY_AND_DISK    同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。
    MEMORY_ONLY_SER    同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
    MEMORY_AND_DSK_SER    同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
    DISK_ONLY    使用非序列化Java对象的方式持久化,完全存储到磁盘上。
    MEMORY_ONLY_2
    MEMORY_AND_DISK_2
    等等    如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。
    
    3.如何选择持久化策略
    Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:
    
    1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。
    2、如果MEMORY_ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。
    3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
    4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。
    七、共享变量
    1.共享变量的原理
    Spark一个非常重要的特性就是共享变量。
    
    默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。
    
    Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。
    
    2. Broadcast Variable
    Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。
    
    可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。
    
    val factor = 3
    val factorBroadcast = sc.broadcast(factor)
    
    val arr = Array(1, 2, 3, 4, 5)
    val rdd = sc.parallelize(arr)
    val multipleRdd = rdd.map(num => num * factorBroadcast.value())
    
    multipleRdd.foreach(num => println(num))
    
    3.Accumulator
    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
    
    val sumAccumulator = sc.accumulator(0)
    val arr = Array(1, 2, 3, 4, 5)
    val rdd = sc.parallelize(arr)
    rdd.foreach(num => sumAccumulator += num)
    
    println(sumAccumulator.value)
    
    八、基于排序机制的wordcount程序
     案例需求
    1、对文本文件内的每个单词都统计出其出现的次数。
    2、按照每个单词出现次数的数量,降序排序。
    1.Java代码
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    /**
     * 排序的wordcount程序
     * @author Administrator
     *
     */
    public class SortWordCount {
    
        public static void main(String[] args) {
            // 创建SparkConf和JavaSparkContext
            SparkConf conf = new SparkConf()
                    .setAppName("SortWordCount")
                    .setMaster("local"); 
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 创建lines RDD
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
            
            // 执行我们之前做过的单词计数
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String t) throws Exception {
                    return Arrays.asList(t.split(" "));  
                }
                
            });        
            JavaPairRDD<String, Integer> pairs = words.mapToPair(
                    
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String t) throws Exception {
                            return new Tuple2<String, Integer>(t, 1);
                        }
                        
                    });    
            JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
                    
                    new Function2<Integer, Integer, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }                
                    });    
            // 到这里为止,就得到了每个单词出现的次数
            // 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序
            // wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2)
            // 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把!
            
            // 进行key-value的反转映射
            JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(            
                    new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
                                throws Exception {
                            return new Tuple2<Integer, String>(t._2, t._1);
                        }                
                    });    
            // 按照key进行排序
            JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);    
            // 再次将value-key进行反转映射
            JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(                
                    new PairFunction<Tuple2<Integer,String>, String, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
                                throws Exception {
                            return new Tuple2<String, Integer>(t._2, t._1);
                        }                
                    });    
            // 到此为止,我们获得了按照单词出现次数排序后的单词计数
            // 打印出来
            sortedWordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {        
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t._1 + " appears " + t._2 + " times.");      
                }        
            });    
            // 关闭JavaSparkContext
            sc.close();
        }
    }
    
    2.scala代码
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    /**
     * @author Administrator
     */
    object SortWordCount { 
      def main(args: Array[String]) {
        val conf = new SparkConf()
            .setAppName("SortWordCount")
            .setMaster("local") 
        val sc = new SparkContext(conf)
        
        val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1)
        val words = lines.flatMap { line => line.split(" ") }  
        val pairs = words.map { word => (word, 1) }  
        val wordCounts = pairs.reduceByKey(_ + _)  
        
        val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1))   
        val sortedCountWords = countWords.sortByKey(false)  
        val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1))  
        
        sortedWordCounts.foreach(sortedWordCount => println(
            sortedWordCount._1 + " appear " + sortedWordCount._2 + " times."))
      }
      
    }
    
    九、二次排序
     案例需求
    1、按照文件中的第一列排序。
    2、如果第一列相同,则按照第二列排序。
    1.java代码
    package cn.spark.study.core;
    
    import java.io.Serializable;
    
    import scala.math.Ordered;
    
    /**
     * 自定义的二次排序key
     * @author Administrator
     *
     */
    public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {
    
        private static final long serialVersionUID = -2366006422945129991L;
        
        // 首先在自定义key里面,定义需要进行排序的列
        private int first;
        private int second;
        
        public SecondarySortKey(int first, int second) {
            this.first = first;
            this.second = second;
        }
    
        @Override
        public boolean $greater(SecondarySortKey other) {
            if(this.first > other.getFirst()) {
                return true;
            } else if(this.first == other.getFirst() && 
                    this.second > other.getSecond()) {
                return true;
            }
            return false;
        }
        
        @Override
        public boolean $greater$eq(SecondarySortKey other) {
            if(this.$greater(other)) {
                return true;
            } else if(this.first == other.getFirst() && 
                    this.second == other.getSecond()) {
                return true;
            }
            return false;
        }
    
        @Override
        public boolean $less(SecondarySortKey other) {
            if(this.first < other.getFirst()) {
                return true;
            } else if(this.first == other.getFirst() && 
                    this.second < other.getSecond()) {
                return true;
            }
            return false;
        }
        
        @Override
        public boolean $less$eq(SecondarySortKey other) {
            if(this.$less(other)) {
                return true;
            } else if(this.first == other.getFirst() && 
                    this.second == other.getSecond()) {
                return true;
            }
            return false;
        }
        
        @Override
        public int compare(SecondarySortKey other) {
            if(this.first - other.getFirst() != 0) {
                return this.first - other.getFirst();
            } else {
                return this.second - other.getSecond();
            }
        }
        
        @Override
        public int compareTo(SecondarySortKey other) {
            if(this.first - other.getFirst() != 0) {
                return this.first - other.getFirst();
            } else {
                return this.second - other.getSecond();
            }
        }
        
        // 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法
        public int getFirst() {
            return first;
        }
    
        public void setFirst(int first) {
            this.first = first;
        }
    
        public int getSecond() {
            return second;
        }
    
        public void setSecond(int second) {
            this.second = second;
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + first;
            result = prime * result + second;
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            SecondarySortKey other = (SecondarySortKey) obj;
            if (first != other.first)
                return false;
            if (second != other.second)
                return false;
            return true;
        }
        
    }
    package cn.spark.study.core;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    /**
     * 二次排序
     * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
     * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD
     * 3、使用sortByKey算子按照自定义的key进行排序
     * 4、再次映射,剔除自定义的key,只保留文本行
     * @author Administrator
     *
     */
    public class SecondarySort {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("SecondarySort") 
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
        
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt");    
            JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(
                    
                    new PairFunction<String, SecondarySortKey, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
                            String[] lineSplited = line.split(" ");  
                            SecondarySortKey key = new SecondarySortKey(
                                    Integer.valueOf(lineSplited[0]), 
                                    Integer.valueOf(lineSplited[1]));  
                            return new Tuple2<SecondarySortKey, String>(key, line);
                        }                
                    });    
            JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey();    
            JavaRDD<String> sortedLines = sortedPairs.map(            
                    new Function<Tuple2<SecondarySortKey,String>, String>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
                            return v1._2;
                        }            
                    });    
            sortedLines.foreach(new VoidFunction<String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(String t) throws Exception {
                    System.out.println(t);  
                }
            });
            sc.close();
        }
    }
    
    
    2.Scala代码
    class SecondSortKey(val first: Int, val second: Int) 
        extends Ordered[SecondSortKey] with Serializable {
      
      def compare(that: SecondSortKey): Int = {
        if(this.first - that.first != 0) {
          this.first - that.first
        } else {
          this.second - that.second
        }
      }
      
    }
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
     * @author Administrator
     */
    object SecondSort {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("SecondSort")  
            .setMaster("local")  
        val sc = new SparkContext(conf)
      
        val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1)
        val pairs = lines.map { line => (
            new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
            line)}
        val sortedPairs = pairs.sortByKey()
        val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)  
        
        sortedLines.foreach { sortedLine => println(sortedLine) }  
      }
      
    }
    
    十、Topn
     案例需求
    1、对文本文件内的数字,取最大的前3个。
    2、对每个班级内的学生成绩,取出前3名。(分组取topn)
    3、课后作用:用Scala来实现分组取topn。
    
    1.java代码
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    /**
     * 取最大的前3个数字
     * @author Administrator
     *
     */
    public class Top3 {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("Top3")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt");    
            JavaPairRDD<Integer, String> pairs = lines.mapToPair(    
                    new PairFunction<String, Integer, String>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Integer, String> call(String t) throws Exception {
                            return new Tuple2<Integer, String>(Integer.valueOf(t), t);
                        }
                    });
            JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
            JavaRDD<Integer> sortedNumbers = sortedPairs.map(    
                    new Function<Tuple2<Integer,String>, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Tuple2<Integer, String> v1) throws Exception {
                            return v1._1;
                        }
                        
                    });
            List<Integer> sortedNumberList = sortedNumbers.take(3);
            for(Integer num : sortedNumberList) {
                System.out.println(num);
            }    
            sc.close();
        }    
    }
    
    2.
    import java.util.Arrays;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    /**
     * 分组取top3
     * @author Administrator
     *
     */
    public class GroupTop3 {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("Top3")
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt");
            JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String line) throws Exception {
                            String[] lineSplited = line.split(" ");  
                            return new Tuple2<String, Integer>(lineSplited[0], 
                                    Integer.valueOf(lineSplited[1]));
                        }
                    });
            JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
            
            JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
                    
            new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
    
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<String, Iterable<Integer>> call(
                                Tuple2<String, Iterable<Integer>> classScores)
                                throws Exception {
                            Integer[] top3 = new Integer[3];
                            String className = classScores._1;
                            Iterator<Integer> scores = classScores._2.iterator();
                            while(scores.hasNext()) {
                                Integer score = scores.next();
                                
                                for(int i = 0; i < 3; i++) {
                                    if(top3[i] == null) {
                                        top3[i] = score;
                                        break;
                                    } else if(score > top3[i]) {
                                        for(int j = 2; j > i; j--) {
                                            top3[j] = top3[j - 1];  
                                        }
                                        
                                        top3[i] = score;
                                        
                                        break;
                                    } 
                                }
                            }
                            return new Tuple2<String, 
                                    Iterable<Integer>>(className, Arrays.asList(top3));    
                        }
                    });
            top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                    System.out.println("class: " + t._1);  
                    Iterator<Integer> scoreIterator = t._2.iterator();
                    while(scoreIterator.hasNext()) {
                        Integer score = scoreIterator.next();
                        System.out.println(score);  
                    }
                    System.out.println("=======================================");   
                }
            });
            sc.close();
        }
    }
    
    2.Scala代码
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
     * @author Administrator
     */
    object Top3 {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("Top3")
            .setMaster("local")  
        val sc = new SparkContext(conf)
        
        val lines = sc.textFile("C://Users//Administrator//Desktop//top.txt", 1)
        val pairs = lines.map { line => (line.toInt, line) }
        val sortedPairs = pairs.sortByKey(false)
        val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1)  
        val top3Number = sortedNumbers.take(3)
        
        for(num <- top3Number) {
          println(num)  
        }
      }
      
    }
    
    
    十一、spark内核架构
    
    1.提交spark应用的机器行通过spark-submit(shell)提交自己的spark程序,称之为Application,提交应用程序后,会启动driver进程,spark-submit使用我们之前一直使用的那种提交模式去提交的时候,我们之前的提交模式就做standalone,其实会通过反射的方式。创建和构造一个DriverActor进程出来
    
    2.执行我们的application应用程序,也就是我们自己编写的代码,代码首先是构造sparkConf、在构造sparkContext
    
    3.sparkContext在初始化的时候,做的最重要的两件事就是构造出来DAGScheduler和taskScheduler。
    
    4.taskScheduler(他有自己的后台进行)实际上,是负责通过他对对应的一个后台进程去连接master,向master注册application
    
    5.master接收到application注册的请求之后,会使用自己的资源调度算法,在spark集群的worker节点上,为这个application启动多个executor。Master通知worker启动executor
    
    6.executor启动之后,会自己反向注册到taskscheduler。每执行到一个action,就会创建一个job,job会提交给DAGScheduler,DAGScheduler会将job划分成多个stage,然后每一个stage创建一个taskSet,
    
    7.taskscheduler会吧taskSet里每一个task,提交到executor执行(task分配算法)
    
    
    8.worker会为application启动executor(进程),executor中有线程池。Executor,每接收到一个task,都会用taskRunner来封装task,然后从线程池里取出一个线程,执行这个task.
    
    9.taskRunner:将我们编写的代码,也就是要执行的算子以及函数,拷贝,反序列化然后进行task
    
    10.task有两种,shuffleMapTask和ResultTask.只有最后一个stage是ResultTask,之前的stage,都是shuffleMapTask.
    
    11.所以。最后整个spark应用程序的执行,就是stage分批次作为taskset提交到executor执行,每一个task针对的RDD的一个partition,执行我们定义的算子和函数,以此类推 ,直到所有的操作执行完为止
    
    十二、宽依赖和窄依赖
    1、Application
    2、spark-submit
    3、Driver
    4、SparkContext
    5、Master
    6、Worker
    7、Executor
    8、Job
    9、DAGScheduler
    10、TaskScheduler
    11、ShuffleMapTask and ResultTask
    
    
    
    1.窄依赖
    Narrow Dependency
    一个RDD,对他的父RDD,只有简单的一对一的依赖关系,也就是说,RDD的每个partition,仅依赖于父的一个partition。父的RDD和子的RDD的partition之间的对应关系是一对一的。这种情况下,是简单的RDD之间的依赖关系
    
    
    2.宽依赖
    Shuffle Dependecy,本质如其名,本质其实就是shuffle。也就是说。每一个父RDD的partition中的数据,都可能会传输一部分。到下一个RDD的每一个partition中,此时就会出现,父RDD和子RDD的partition之间,具有交互错综负责的关系。那么,这种情况,就叫做两个RDD之间的宽依赖。同时,他们之间的操作就是shuffle。
    
    十三、基于YARN两种提交模式
    1、Spark内核架构,其实就是第一种模式,standalone模式,基于Spark自己的Master-Worker集群。
    2、第二种,是基于YARN的yarn-cluster模式。
    3、第三种,是基于YARN的yarn-client模式。
    4、如果,你要切换到第二种和第三种模式,很简单,将我们之前用于提交spark应用程序的spark-submit脚本,加上--master参数,设置为yarn-cluster,或yarn-client,即可。如果你没有设置,那么,就是standalone模式。
    
    
    
    1.Yarn-cluster提交模式
    Spark-submit提交,发送请求到resourcemanager,请求启动applicationmaster,resourcemanager分配一个containner,在某个nodemanager上启动一个applicationmasstesr,applicationmaster相当于是driver,applicationmaster连接其他的nodemanager来启动Executor,这里的nodemanager相当于是worker,executor启动后,向application反向注册
    
    2.Yarn-clinet提交模式
    Spark-submit提交,在本地启动driver进程(主要与yarn-cluster进程不同的地方,yarn-cluster是在nodemanager上启动),发送请求给resourcemanager,请求启动applicationmaster,resourcemanager分配一个container,在某个nodemanager上启动一个applicationmaster,但是这里的applicationmaster 其实只是一个executorLauncher。在nodemanager上启动executorLanucher(applicationmaser) ,resourcemanager分配一批container,然后applicationmaster连接其他的nodemanager,用container的资源,启动executor,executor反向注册到本地的driver上
    
    3.选用原则
    1.yarn-clinet用于测试,因为,driver运行在本地客户端,负责调整application,会与yarn集群产生大量的网络通信,从而导致网卡流量激增,可能会被你们公司的SA(运维)警告。好处是直接执行,本地可以看到所有的log。方便进行调试
    
    2.Yarn-cluster用于生产环境,因为driver运行在nodemanager,没有忘啦流量激增的问题。缺点在于,调试不方便,本地应spark-submint提交后,看不大log,只能通过yarn application-logs application-id 这种命令查看,很是麻烦
    十四、SparkContext原理剖析与源码分析
    1、TaskScheduler
    2、DAGScheduler
    3、SparkUI
    
    
    
    十五、
    
    
    
    
    
    
    
    
    Spark性能优化
    1.概述
    由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。但是如果内存比较紧张,不足以放下所有的数据(比如在针对10亿以上的数据量进行计算时),还是需要对内存的使用进行性能优化的,比如说使用一些手段来减少内存的消耗。
    
    Spark性能优化,其实主要就是在于对内存的使用进行调优。因为通常情况下来说,如果你的Spark应用程序计算的数据量比较小,并且你的内存足够使用,那么只要运维可以保障网络通常,一般是不会有大的性能问题的。但是Spark应用程序的性能问题往往出现在针对大数据量(比如10亿级别)进行计算时出现,因此通常来说,Spark性能优化,主要是对内存进行性能优化。当然,除了内存调优之外,还有很多手段可以优化Spark应用程序的性能。
    
    2.spark性能调优的重要性
    1.实际上Spark到目前为止,在大数据业界的影响力和覆盖度,还远没有达到Hadoop的水平,——虽然说,我们之前一再强调,Spark Core、Spark SQL、Spark Streaming,可以替代MapReduce、Hive查询引擎、Storm。但是事实就是,Spark还没有达到已经替代了它们的地步。
    
    根据我在研究Spark,并且在一线使用Spark,与大量行业内的大数据相关从业人员沟通的情况来看。Spark最大的优点,其实也是它目前最大的问题——基于内存的计算模型。Spark由于使用了基于内存的计算模型,因此导致了其稳定性,远远不如Hadoop。虽然我也很喜欢和热爱Spark,但是这就是事实,Spark的速度的确达到了hadoop的几倍、几十倍、甚至上百倍(极端情况)。但是基于内存的模型,导致它经常出现各种OOM(内存溢出)、内部异常等问题。
    
    说一个亲身经历的例子,曾经用Spark改写几个复杂的MapReduce程序,虽然MapReduce很慢,但是它很稳定,至少慢慢跑,是可以跑出来数据的。但是用Spark Core很快就改写完了程序,问题是,在整整半个月之内,Spark程序根本跑不起来,因为数据量太大,10亿+。导致它出现了各种各样的问题,包括OOM、文件丢失、task lost、内部异常等等各种问题。最后耗费了大量时间,最一个spark程序进行了大量的性能调优,才最终让它可以跑起来。
    
    的确,用了Spark,比MapReduce的速度快了十倍,但是付出的代价是惨痛的,花了整整一个月的时间做这个事情。
    
    2.因此,当我在公司推广Spark的使用时,很多人都不无担心地说,听说Spark还不够稳定,经常出现问题,比如OOM等,它的稳定性,导致业界的人们不太敢轻易尝试它,在复杂的大数据系统,要求极高稳定性的线程系统中使用。——当然,如果你就是开发一个针对公司内部的,稳定性要求不高的系统,当然不用担心这个问题。
    
    所以,我认为,Spark的基于内存的本质,就导致了上述的问题,导致了它目前还无法完全提到Hadoop中的某些技术。
    
    但是,纵然Spark有各种问题,其优点就是缺点,缺点也是优点——它实在是很快。优秀的Spark应用程序,性能完全可以达到MapReduce、Hive查询引擎的数倍、甚至数十倍。因此,纵使有各种担忧,Spark还是吸引着大量的人们以及公司去探索,和尝试攻克它,使用它,让它为我们所用,用它开放更棒的大数据系统。
    
    因此,正是基于上述背景,Spark工程师的要求是非常高的。比如我们这里,我们正在用Spark开发大型复杂的线上大数据系统,所以针对Spark的招聘,我们是要求Spark工程师必须精通Spark内核源码,能够对程序进行性能优化。——打个广告,实际上,我认为如果能精通本系列课程,那么成为一个行业内优秀的Spark工程师,是一定没有问题的。
    
    3.所以,Spark虽然有它的问题所在,但是它的优势还是让它以极快的速度,极强的劲头在行业内快速发展。行业内各个公司,也大量缺乏着优秀的Spark工程师。而如果是想转型进行Spark开发的朋友,基于上述种种背景,就应该明白了,Spark性能优化,对于你找工作,对于你在实际工作中解决问题的重要性了!
    
    要成为优秀的Spark工程师,顺利实现转型,那么就必须能够彻底精通Spark内核源码,能够基于对Spark内核原理的深度理解,对线上复杂的Spark大数据系统 / 程序出现的报错和故障,进行排查和解决;能够对运行较慢的Spark应用程序,进行精准的性能问题排查,并且对症下游,针对各种性能问题,使用对应的技术手段,进行解决。
    
    只有这样,我认为,你才能够顺利实现转型,出去成功面试Spark工程师,甚至是高级Spark工程师的岗位。才能在实际工作中,真正让Spark发挥出其巨大的威力。而不仅仅是处于对新技术的喜爱,对Spark进行浅尝辄止的学习——那是没有任何用的。
    
    不精通Spark内核源码,不精通Spark性能优化,也许你能找到Spark大数据的工作,但是通常情况下,也只能进入比较缺人的小公司。要进入大公司,找到更好的职业机会,那么就一起在精通了之前的Spark内核源码深度剖析阶段之后,来进入Spark性能优化阶段的学习吧。
    3.性能优化的技术
    
    诊断内存的消耗
    
    内存都花费在哪里了?
    
    1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。
    
    2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。
    
    3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。
    
    4、元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。
    
    如何判断你的程序消耗了多少内存?
    这里有一个非常简单的办法来判断,你的spark程序消耗了多少内存。
    
    1、首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task / partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量。
    
    2、其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。
    
    3、最后,观察Driver的log,你会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。这就显示了每个partition占用了多少内存。
    
    4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。
    
    Spark的性能优化,主要手段包括:
    1、使用高性能序列化类库
    2、优化数据结构
    3、对多次使用的RDD进行持久化 / Checkpoint
    4、使用序列化的持久化级别
    5、Java虚拟机垃圾回收调优
    6、提高并行度
    7、广播共享数据
    8、数据本地化
    9、reduceByKey和groupByKey的合理使用
    10、Shuffle调优(核心中的核心,重中之重)
    
    1、使用高性能序列化类库
    数据序列化概述:
    在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。
    
    Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。
    
    而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。
    
    但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。
    
    Spark提供了两种序列化的机制:
    
    Spark实际上提供了两种序列化机制,它只是默认使用了第一种:
    
    1、Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。
    
    2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。
    
     如何使用Kryo序列化机制(一)
    如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。
    
    使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。
    
    但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。
    
    (实际上,下面的写法是错误的,因为counter不是共享的,所以累加的功能是无法实现的)
    val counter = new Counter();
    val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
    numbers.foreach(num => counter.add(num));
    
    
    如何使用Kryo序列化机制(二)
    
    如果要注册自定义的类型,那么就使用如下的代码,即可:
    
    Scala版本:
    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.registerKryoClasses(Array(classOf[Counter] ))
    val sc = new SparkContext(conf)
    
    Java版本:
    SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
    conf.registerKryoClasses(Counter.class)
    JavaSparkContext sc = new JavaSparkContext(conf)
    
    
     优化Kryo类库的使用
    
    1、优化缓存大小
    如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。
    
    默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。
    
    2、预先注册自定义类型
    虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。
    
     在什么场景下使用Kryo序列化类库?
    首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,比如RDD的持久化,在后面会讲解。这里先不说。
    
    那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。
    
    此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。
    
    因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。
    
    2.优化数据结构
    要减少内存的消耗,除了使用高效的序列化类库以外,还有一个很重要的事情,就是优化数据结构。从而避免Java语法特性中所导致的额外内存的开销,比如基于指针的Java数据结构,以及包装类型。
    
    有一个关键的问题,就是优化什么数据结构?其实主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。
    
     如何优化数据结构(一)
    
    1、优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。
    
    比如,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。
    
    还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以优化为,特殊的字符串格式:id:name,address|id:name,address...。
    
    
    2、避免使用多层嵌套的对象结构。比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象。
    
    比如说,对于上述例子,也完全可以使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个很好的选择。
    
    {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]}
    
    3、对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,但是之前分析过,还是有额外信息的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。
    
    这里提醒,在spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。(sdfsdfdf-234242342-sdfsfsfdfd)
    
    
    3.对多次使用的RDD进行持久化或Checkpoint
    如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。
    
    此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。
    4.使用序列化的持久化级别
    除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。因为很有可能,RDD的数据是持久化到内存,或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。
    
    这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比较小。
    
    对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。
    
    因此,对于序列化的持久化级别,还可以进一步优化,也就是说,使用Kryo序列化类库,这样,可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提前注册自定义类型。
    
    5.Java虚拟机垃圾回收调优
     Java虚拟机垃圾回收调优的背景
    如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不在使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。
    
    垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,比如array和string;其次就是在持久化rdd时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个partition就只是一个对象——一个字节数组。
    
    监测垃圾回收
    我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。
    
    但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker上的日志中,而不是driver的日志中。
    
    但是这种方式也只是一种,其实也完全可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。
    
    优化executor内存比例
    
    对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。
    
    在这种情况下,很有可能因为你的内存空间的不足,task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。
    
    在上述情况下,如果发现垃圾回收频繁发生。那么就需要对那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFraction", "0.5")即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。
    
    因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗。给task提供更多的内存,从而避免task的执行频繁触发垃圾回收。
    
    高级垃圾回收调优(一)
    
        Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象。年轻代又被划分了三块空间,Eden、Survivor1、Survivor2。
    
    首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。
    
    如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。
    
    如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。
    
    Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代。从而造成短时间存活的对象,长期呆在老年代中占据了空间,而且Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。
    
    如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:
    1、包括降低spark.storage.memoryFraction的比例,给年轻代更多的空间,来存放短时间存活的对象;
    2、给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/33、如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 4/3。
    
    总结:
    其实啊,根据经验来看,对于垃圾回收的调优,尽量就是说,调节executor内存的比例就可以了。因为jvm的调优是非常复杂和敏感的。除非是,真的到了万不得已的地方,然后呢,自己本身又对jvm相关的技术很了解,那么此时进行eden区域的调节,调优,是可以的。
    
    一些高级的参数
    -XX:SurvivorRatio=4:如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代的比例是1/6,所以,你其实也可以尝试调大Survivor区域的大小。
    -XX:NewRatio=4:调节新生代和老年代的比例
    
    6.提高并行度
    
    实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。
    
    Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。
    
    可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。
    
    比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。
    
    7.广播共享数据
    如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。
    
    这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。
    
    
    8.数据本地化
    数据本地化背景
    数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。
    
    数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:
    1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。
    2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。
    3、NO_PREF:数据从哪里过来,性能都是一样的。
    4、RACK_LOCAL:数据和计算它的代码在一个机架上。
    5ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。
    数据本地化优化
    Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。
    
    Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。
    
    可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。
    
    9.reduceByKey和groupByKey
    val counts = pairs.reduceByKey(_ + _)
    
    val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))
    
    如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。
    
    只有在reduceByKey处理不了时,才用groupByKey().map()来替代。
    10.shuffle调优
  • 相关阅读:
    【转载】天才与柱子
    Windows Phone 7 隔离存储空间资源管理器
    (收藏)让你平步青云的十个谈话技巧
    (收藏)《博客园精华集》设计模式分册
    (收藏)C#开源资源大汇总
    (收藏)生活物语
    (收藏)C# ORM/持久层框架
    (收藏)《博客园精华集》AJAX与ASP.NET AJAX部分
    小型项目总结之五
    VS 打包完美解决方案
  • 原文地址:https://www.cnblogs.com/yin-fei/p/10778800.html
Copyright © 2011-2022 走看看