SPARK核心编程 一、spark基本工作原理与RDD 1.Spark的基本工作原理 1.分布式(RDD的partition) 2.主要是基于内存(少数情况下数基于磁盘) 3.迭代式计算(RDD->RDD->RDD) 客户端(client):我们在本地编写了spark程序,然后必须在某台能够连接spark的机器上提交spark程序 Spark程序被提交到spark集群上进行运算 spark数据的来源可能是hadoop的hdfs或者hive上,将读取到的数据分散到集群的不同的节点的内存中,对节点上的数据进行操作处理,处理后的数据可能会移动到其他节点的内存中,可能是继续由其他的节点进行操作,所有的计算操作都是针对多个节点上的数据,进行并行计算操作的(因为一个RDD是分区的,每个计算都是在不同节点上并行计算)。 Spark处理完的结果存储的位置可能是hadoop的hdfs、hive或者是MySQL等DB亦或是直接返回到客户端(运行spark程序的机器进程) Spark与mapreduce最大的不同就在于,迭代式计算模型: Mapreduce ,分为两个阶段,map和reduce,两个节点完了,就结束了,所以我们在一个job中能做得处理很有限,只能在map和reduce里处理 Spark,计算模型可以分为n个阶段,因为他是内存迭代式的,我们在处理完一个阶段以后,可以继续往下处理很多的阶段,而不只是两个阶段,所以,spark相较于mapreduce来说,计算模型可以提供更强大的功能 2.RDD以及其特点 弹性:内存资源不足的情况下,自动将RDD存储到磁盘上,以及切换机制 分布式:被分区的,分布到不同的节点上进行并行的计算 数据集:数据的集合 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集) 3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合的并行化来创建。 4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。 5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性) 一个RDD在逻辑上,抽象的代表一个HDFS文件。但是他实际上是被分区得。分为多个分区,多个分区散落在spark集群中的不同的节点上。比如RDD有90万数据,分为9个partition,9个分区 现在,节点9出了故障,导致parttion 9的数据丢失,RDD具有很强的容错性,当他发现自己的数据丢失了,那么此时会自动从自己来源的数据进行重计算,重新获取自己这份数据,这一切对用户,都是完全透明的。 RDD的每个partition,在spark节点上存储时,默认都是放在内存中的。但是如果说内存放不下这么多数据时,比如每个节点最多放5万数据,结果你每个partition是10万数据。那么就会把partition中的部分数据写入磁盘上,进行保存。 而上述这一切,对于用户来说,都是完全透明的。也就是说,你不用去管RDD的数据存储在哪里,内存,还是磁盘。只要关注,你针对RDD来进行计算,和处理,等等操作即可。 所以说,RDD的这种自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特点所在。 3.spark核心编程 Spark核心编程是什么? 第一: 定义初始的RDD,就是说你要定义第一个RDD是从哪里,读取数据,hdfs、linux本地文件、程序中的集合。 第二: 定义对RDD的计算操作,这个在spark中称之为算子,map 、reduce、flatMap、groupByKey等,比mapreduce强大太多 第三: 就是循环往复的过程,计算完成之后,数据就可能到了一批新的节点上,也就是变成了一个新的RDD,然后在再次反复,针对新的RDD定义计算操作 第四; 获取最终的数据,将数据保存起来 每一个节点上的每一批数据,实际上就是一个RDD,一个RDD是分布式的,所以数据都散落在一批节点上了,每个节点都存储了RDD的部分partition 4.spark开发 1、核心开发:离线批处理 / 延迟性的交互式数据处理 2、SQL查询:底层都是RDD和计算操作 3、实时计算:底层都是RDD和计算操作 二、使用java、Scala、spark-shell开发wordCount 1、用Java开发wordcount程序 1.1 配置maven环境 1.2 如何进行本地测试 【注意】 本地测试的作用,先伪造一些数据,测试代码的正确性之后,提交到集群中,执行 防止在集群中出错,同时减少解决问题的时间,因为数据量比实际的要少的多 1.3 如何使用spark-submit提交到spark集群进行执行(spark-submit常用参数说明,spark-submit其实就类似于hadoop的hadoop jar命令) Pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.spark</groupId> <artifactId>SparkTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SparkTest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.3.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/main/test</testSourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>cn.spark.sparktest.App</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project> Wordcountlocal package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 使用java开发本地测试的wordcount程序 * @author Administrator * */ public class WordCountLocal { public static void main(String[] args) { // 编写Spark应用程序 // 本地执行,是可以执行在eclipse中的main方法中,执行的 // 第一步:创建SparkConf对象,设置Spark应用的配置信息 // 使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url // 但是如果设置为local则代表,在本地运行 SparkConf conf = new SparkConf() .setAppName("WordCountLocal") .setMaster("local"); // 第二步:创建JavaSparkContext对象 // 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写 // 都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括 // 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等 // 一句话,SparkContext,是Spark应用中,可以说是最最重要的一个对象 // 但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala, // 使用的就是原生的SparkContext对象 // 但是如果使用Java,那么就是JavaSparkContext对象 // 如果是开发Spark SQL程序,那么就是SQLContext、HiveContext // 如果是开发Spark Streaming程序,那么就是它独有的SparkContext // 以此类推 JavaSparkContext sc = new JavaSparkContext(conf); // 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD // 输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集 // 我们这里呢,因为是本地测试,所以呢,就是针对本地文件 // SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法 // 在Java中,创建的普通RDD,都叫做JavaRDD // 在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件呢,创建的RDD,每一个元素就相当于 // 是文件里的一行 JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt"); // 第四步:对初始RDD进行transformation操作,也就是一些计算操作 // 通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行 // function,通常,如果比较简单,则创建指定Function的匿名内部类 // 但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类 // 先将每一行拆分成单个的单词 // FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型 // 我们这里呢,输入肯定是String,因为是一行一行的文本,输出,其实也是String,因为是每一行的文本 // 这里先简要介绍flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); // 接着,需要将每一个单词,映射为(单词, 1)的这种格式 // 因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加 // mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素 // 如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值 // mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型 // 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型 // JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型 JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); // 接着,需要以单词作为key,统计每个单词出现的次数 // 这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作 // 比如JavaPairRDD中有几个元素,分别为(hello, 1) (hello, 1) (hello, 1) (world, 1) // reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算 // 比如这里的hello,那么就相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3 // 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value // reduce之后的结果,相当于就是每个单词出现的次数 JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数 // 但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作 // 一个Spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action // 接着,最后,可以使用一种叫做action操作的,比如说,foreach,来触发程序的执行 wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times."); } }); sc.close(); } } 集群模式 package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 将java开发的wordcount程序部署到spark集群上运行 * @author Administrator */ public class WordCountCluster { public static void main(String[] args) { // 如果要在spark集群上运行,需要修改的,只有两个地方 // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接 // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件 // 实际执行步骤: // 1、将spark.txt文件上传到hdfs上去 // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包 // 3、将打包后的spark工程jar包,上传到机器上执行 // 4、编写spark-submit脚本 // 5、执行spark-submit脚本,提交spark应用到集群执行 SparkConf conf = new SparkConf() .setAppName("WordCountCluster"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt"); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times."); } }); sc.close(); } } 2、用Scala开发wordcount程序 2.1 下载scala ide for eclipse 2.2 在Java Build Path中,添加spark依赖包(如果与scala ide for eclipse原生的scala版本发生冲突,则移除原生的scala / 重新配置scala compiler) 2.3 用export导出scala spark工程 Scala_wordcount package cn.spark.study.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); val words = lines.flatMap { line => line.split(" ") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey { _ + _ } wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times.")) Sc.stop() } } 3、用spark-shell开发wordcount程序 3.1 常用于简单的测试 --master 不写表示的是local 本地模式 --master spark://hadoop01:7077 表示的是spark自生的集群模式 standalone --master yarn-client --master yarn-cluster 表示spark应用运行在yarn上 三、spark的架构原理 1.原理图 2.详解 组件:master worker executor task driver 1.我们在集群中的一个节点上或者自己提交spark程序的机器上提交一个spark程序后,会启动一个driver进程。 2.Driver 进程启动后会做一些初始化的操作,在这个过程中,就会发送发送请求到master上,进行spark应用程序的注册,说白了就是让master知道,有一个新的spark应用程序要运行。 3.Master是一个进行,主要是负责资源的调度和分配,还有集群的监控等职责。 4.Worker是一个进行,主要是负责两个,一个是用自己的内存存储RDD的某个或者是某些partition,另外一个是启动其他的进程和线程,对RDD上的partition进行处理和计算 5.Worker接收到master的请求之后,会为spark应用起动executor 6.Executor和task其实就是负责执行,对RDD的partition进行并行的计算,也就是执行我们对RDD定义的,比如map flatMap reduce 等算子操作 7. Executor启动之后,会向driver进行反注册,这样driver就知道哪些executor是为他进行服务的了。 8.Driver注册了一些executor之后,就可以开始正式的执行我们的spark应用程序了。首先第一步就是创建RDD,读取数据源的文件 9.HDFS上的文件内容被读取到多个work节点上,形成内存中的分布式的数据集,也就是初始RDD 10.Driver会根据我们对RDD定义的操作,提交一大堆的task去executor上 11.Executor接收到task之后,会启动多个线程来执行task. 12.Task会对RDD的partition数据执行指定的算子操作,形成新的RDD的partition 四、创建RDD 创建的方式 进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。 Spark core提供三种创建RDD的三种方式: 1.使用程序中的集合创建RDD,并行化集合 如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。 调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10) // 案例:1到10累加求和 val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val rdd = sc.parallelize(arr) val sum = rdd.reduce(_ + _) 2.使用本地文件创建RDD Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。 有几个事项是需要注意的: 1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。 2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。 3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。 // 案例:文件字数统计 val rdd = sc.textFile("data.txt") val wordCount = rdd.map(line => line.length).reduce(_ + _) 3.使用HDFS文件创建RDD Spark的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特列的方法来创建RDD: 1、SparkContext.wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回<filename, fileContent>组成的pair,作为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每个元素就是文件中的一行文本。 2、SparkContext.sequenceFile[K, V]()方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。 3、SparkContext.hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。 4、SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。 注意; 1.使用程序中的集合创建RDD,只要用于测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用流程 2.使用本地文件创建RDD,主要是用于临时的处理一些存储大量的数据的文件 3.使用HDFS文件创建RDD,应该是最常用的生产环境的处理方式,只要可以针对HDFS上存储的大数据,进行离线的批处理操作 Spark core主要是用来代替mapreduce的批处理操作 五、RDD的操作 RDD的两种操作 Spark支持两种RDD操作:transformation和action。transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。 例如,map就是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。 transformation的特点就是lazy特性。lazy特性指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。 action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。 案例一:统计文件的字数 这里通过一个之前学习过的案例,统计文件字数,来讲解transformation和action。 // 这里通过textFile()方法,针对外部文件创建了一个RDD,lines,但是实际上,程序执行到这里为止,spark.txt文件的数据是不会加载到内存中的。lines,只是代表了一个指向spark.txt文件的引用。 val lines = sc.textFile("spark.txt") // 这里对lines RDD进行了map算子,获取了一个转换后的lineLengths RDD。但是这里连数据都没有,当然也不会做任何操作。lineLengths RDD也只是一个概念上的东西而已。 val lineLengths = lines.map(line => line.length) // 之列,执行了一个action操作,reduce。此时就会触发之前所有transformation操作的执行,Spark会将操作拆分成多个task到多个机器上并行执行,每个task会在本地执行map操作,并且进行本地的reduce聚合。最后会进行一个全局的reduce聚合,然后将结果返回给Driver程序。 val totalLength = lineLengths.reduce(_ + _) 案例二:统计文件每行出现的次数 Spark有些特殊的算子,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其实只是针对特殊的RDD的。即包含key-value对的RDD。而这种RDD中的元素,实际上是scala中的一种类型,即Tuple2,也就是包含两个值的Tuple。 在scala中,需要手动导入Spark的相关隐式转换,import org.apache.spark.SparkContext._。然后,对应包含Tuple2的RDD,会自动隐式转换为PairRDDFunction,并提供reduceByKey等方法。 val lines = sc.textFile("hello.txt") val linePairs = lines.map(line => (line, 1)) val lineCounts = linePairs.reduceByKey(_ + _) lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + llineCount._2 + " times.")) 2.常见的Transformation操作 操作 介绍 map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。 flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素。 gropuByKey 根据key进行分组,每个key对应一个Iterable<value> reduceByKey 对每个key对应的value进行reduce操作。 sortByKey 对每个key对应的value进行排序操作。 join 对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。 cogroup 同join,但是是每个key对应的Iterable<value>都会传入自定义函数进行处理。 1.Map Java: /** * map算子案例:将集合中每一个元素都乘以2 */ private static void map() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 构造集合 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); // 并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 使用map算子,将集合中的每个元素都乘以2 // map算子,是对任何类型的RDD,都可以调用的 // 在java中,map算子接收的参数是Function对象 // 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型 // 同时call()方法的返回类型,也必须与第二个泛型类型同步 // 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素 // 所有新的元素就会组成一个新的RDD JavaRDD<Integer> multipleNumberRDD = numberRDD.map( new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; // 传入call()方法的,就是1,2,3,4,5 // 返回的就是2,4,6,8,10 @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); // 打印新的RDD multipleNumberRDD.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); // 关闭JavaSparkContext sc.close(); } Scala def map() { val conf = new SparkConf() .setAppName("map") .setMaster("local") val sc = new SparkContext(conf) val numbers = Array(1, 2, 3, 4, 5) val numberRDD = sc.parallelize(numbers, 1) val multipleNumberRDD = numberRDD.map { num => num * 2 } multipleNumberRDD.foreach { num => println(num) } } 2.Filter Java /** * filter算子案例:过滤集合中的偶数 */ private static void filter() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("filter") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // 并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 对初始RDD执行filter算子,过滤出其中的偶数 // filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的 // 但是,唯一的不同,就是call()方法的返回类型是Boolean // 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑 // 来判断这个元素是否是你想要的 // 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回false JavaRDD<Integer> evenNumberRDD = numberRDD.filter( new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; // 在这里,1到10,都会传入进来 // 但是根据我们的逻辑,只有2,4,6,8,10这几个偶数,会返回true // 所以,只有偶数会保留下来,放在新的RDD中 @Override public Boolean call(Integer v1) throws Exception { return v1 % 2 == 0; } }); // 打印新的RDD evenNumberRDD.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); // 关闭JavaSparkContext sc.close(); } Scala def filter() { val conf = new SparkConf() .setAppName("filter") .setMaster("local") val sc = new SparkContext(conf) val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numberRDD = sc.parallelize(numbers, 1) val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 } evenNumberRDD.foreach { num => println(num) } } 3.flatMap java /** * flatMap案例:将文本行拆分为多个单词 */ private static void flatMap() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("flatMap") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 构造集合 List<String> lineList = Arrays.asList("hello you", "hello me", "hello world"); // 并行化集合,创建RDD JavaRDD<String> lines = sc.parallelize(lineList); // 对RDD执行flatMap算子,将每一行文本,拆分为多个单词 // flatMap算子,在java中,接收的参数是FlatMapFunction // 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型 // call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同 // flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素 // 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合 // 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 >= 原始RDD的大小 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; // 在这里会,比如,传入第一行,hello you // 返回的是一个Iterable<String>(hello, you) @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); // 打印新的RDD words.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); // 关闭JavaSparkContext sc.close(); } Scala def flatMap() { val conf = new SparkConf() .setAppName("flatMap") .setMaster("local") val sc = new SparkContext(conf) val lineArray = Array("hello you", "hello me", "hello world") val lines = sc.parallelize(lineArray, 1) val words = lines.flatMap { line => line.split(" ") } words.foreach { word => println(word) } } 4.groupByKey java /** * groupByKey案例:按照班级对成绩进行分组 */ private static void groupByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("groupByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 65)); // 并行化集合,创建JavaPairRDD JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组 // groupByKey算子,返回的还是JavaPairRDD // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型 // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable // 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); // 打印groupedScores RDD groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> ite = t._2.iterator(); while(ite.hasNext()) { System.out.println(ite.next()); } System.out.println("=============================="); } }); // 关闭JavaSparkContext sc.close(); } Scala def groupByKey() { val conf = new SparkConf() .setAppName("groupByKey") .setMaster("local") val sc = new SparkContext(conf) val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75), Tuple2("class1", 90), Tuple2("class2", 60)) val scores = sc.parallelize(scoreList, 1) val groupedScores = scores.groupByKey() groupedScores.foreach(score => { println(score._1); score._2.foreach { singleScore => println(singleScore) }; println("=============================") }) } 5.reduceByKey java /** * reduceByKey案例:统计每个班级的总分 */ private static void reduceByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("reduceByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 65)); // 并行化集合,创建JavaPairRDD JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); // 针对scores RDD,执行reduceByKey算子 // reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值 // 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型 // 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入 // 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型 // 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的 // reduceByKey算法返回的RDD,还是JavaPairRDD<key, value> JavaPairRDD<String, Integer> totalScores = scores.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; // 对每个key,都会将其value,依次传入call方法 // 从而聚合出每个key对应的一个value // 然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素 @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 打印totalScores RDD totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + ": " + t._2); } }); // 关闭JavaSparkContext sc.close(); } Scala def reduceByKey() { val conf = new SparkConf() .setAppName("groupByKey") .setMaster("local") val sc = new SparkContext(conf) val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75), Tuple2("class1", 90), Tuple2("class2", 60)) val scores = sc.parallelize(scoreList, 1) val totalScores = scores.reduceByKey(_ + _) totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2)) } 6.sortByKey java /** * sortByKey案例:按照学生分数进行排序 */ private static void sortByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("sortByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(65, "leo"), new Tuple2<Integer, String>(50, "tom"), new Tuple2<Integer, String>(100, "marry"), new Tuple2<Integer, String>(80, "jack")); // 并行化集合,创建RDD JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); // 对scores RDD执行sortByKey算子 // sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序 // 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的 // 但是就是RDD中的元素的顺序,不同了 JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false); // 打印sortedScored RDD sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1 + ": " + t._2); } }); // 关闭JavaSparkContext sc.close(); } Scala def sortByKey() { val conf = new SparkConf() .setAppName("sortByKey") .setMaster("local") val sc = new SparkContext(conf) val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"), Tuple2(100, "marry"), Tuple2(85, "jack")) val scores = sc.parallelize(scoreList, 1) val sortedScores = scores.sortByKey(false) sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2)) } 7.Join java /** * join案例:打印学生成绩 */ private static void join() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("join") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 60)); // 并行化两个RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); // 使用join算子关联两个RDD // join以后,还是会根据key进行join,并返回JavaPairRDD // 但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key的类型,因为是通过key进行join的 // 第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型 // join,就返回的RDD的每一个元素,就是通过key join上的一个pair // 什么意思呢?比如有(1, 1) (1, 2) (1, 3)的一个RDD // 还有一个(1, 4) (2, 1) (2, 2)的一个RDD // join以后,实际上会得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4)) JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores); // 打印studnetScores RDD studentScores.foreach( new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("student id: " + t._1); System.out.println("student name: " + t._2._1); System.out.println("student score: " + t._2._2); System.out.println("==============================="); } }); // 关闭JavaSparkContext sc.close(); } Scala def join() { val conf = new SparkConf() .setAppName("join") .setMaster("local") val sc = new SparkContext(conf) val studentList = Array( Tuple2(1, "leo"), Tuple2(2, "jack"), Tuple2(3, "tom")); val scoreList = Array( Tuple2(1, 100), Tuple2(2, 90), Tuple2(3, 60)); val students = sc.parallelize(studentList); val scores = sc.parallelize(scoreList); val studentScores = students.join(scores) studentScores.foreach(studentScore => { println("student id: " + studentScore._1); println("student name: " + studentScore._2._1) println("student socre: " + studentScore._2._2) println("=======================================") }) } 8.Cogroup /** * cogroup案例:打印学生成绩 */ private static void cogroup() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("cogroup") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80), new Tuple2<Integer, Integer>(3, 50)); // 并行化两个RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); // cogroup与join不同 // 相当于是,一个key join上的所有value,都给放到一个Iterable里面去了 // cogroup,不太好讲解,希望大家通过动手编写我们的案例,仔细体会其中的奥妙 JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores); // 打印studnetScores RDD studentScores.foreach( new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() { private static final long serialVersionUID = 1L; @Override public void call( Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println("student id: " + t._1); System.out.println("student name: " + t._2._1); System.out.println("student score: " + t._2._2); System.out.println("==============================="); } }); // 关闭JavaSparkContext sc.close(); } Scala def join() { val conf = new SparkConf() .setAppName("join") .setMaster("local") val sc = new SparkContext(conf) val studentList = Array( Tuple2(1, "leo"), Tuple2(2, "jack"), Tuple2(3, "tom")); val scoreList = Array( Tuple2(1, 100), Tuple2(2, 90), Tuple2(3, 60)); val students = sc.parallelize(studentList); val scores = sc.parallelize(scoreList); val studentScores = students.join(scores) studentScores.foreach(studentScore => { println("student id: " + studentScore._1); println("student name: " + studentScore._2._1) println("student socre: " + studentScore._2._2) println("=======================================") }) } 3.常见的action操作 操作 介绍 reduce 将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。 collect 将RDD中所有元素获取到本地客户端。 count 获取RDD元素总数。 take(n) 获取RDD中前n个元素。 saveAsTextFile 将RDD元素保存到文件中,对每个元素调用toString方法 countByKey 对每个key对应的值进行count计数。 foreach 遍历RDD中的每个元素。 1.Reduce Java private static void reduce() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("reduce") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用reduce操作对集合中的数字进行累加 // reduce操作的原理: // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3 // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6 // 以此类推 // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素 int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(sum); // 关闭JavaSparkContext sc.close(); } Scala def reduce() { val conf = new SparkConf() .setAppName("reduce") .setMaster("local") val sc = new SparkContext(conf) val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numbers = sc.parallelize(numberArray, 1) val sum = numbers.reduce(_ + _) println(sum) } 2.Collect Java private static void collect() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("collect") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用map操作将集合中所有数字乘以2 JavaRDD<Integer> doubleNumbers = numbers.map( new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); // 不用foreach action操作,在远程集群上遍历rdd中的元素 // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地 // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条 // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地 // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出 // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理 List<Integer> doubleNumberList = doubleNumbers.collect(); for(Integer num : doubleNumberList) { System.out.println(num); } // 关闭JavaSparkContext sc.close(); } scala def collect() { val conf = new SparkConf() .setAppName("collect") .setMaster("local") val sc = new SparkContext(conf) val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numbers = sc.parallelize(numberArray, 1) val doubleNumbers = numbers.map { num => num * 2 } val doubleNumberArray = doubleNumbers.collect() for(num <- doubleNumberArray) { println(num) } } 3.Count Java private static void count() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("count") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 对rdd使用count操作,统计它有多少个元素 long count = numbers.count(); System.out.println(count); // 关闭JavaSparkContext sc.close(); } scala def count() { val conf = new SparkConf() .setAppName("count") .setMaster("local") val sc = new SparkContext(conf) val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numbers = sc.parallelize(numberArray, 1) val count = numbers.count() println(count) } 4.Take(n) Java private static void take() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("take") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 对rdd使用count操作,统计它有多少个元素 // take操作,与collect类似,也是从远程集群上,获取rdd的数据 // 但是collect是获取rdd的所有数据,take只是获取前n个数据 List<Integer> top3Numbers = numbers.take(3); for(Integer num : top3Numbers) { System.out.println(num); } // 关闭JavaSparkContext sc.close(); } scala def take() { val conf = new SparkConf() .setAppName("take") .setMaster("local") val sc = new SparkContext(conf) val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numbers = sc.parallelize(numberArray, 1) val top3Numbers = numbers.take(3) for(num <- top3Numbers) { println(num) } } 5.saveAsTextFile Java private static void saveAsTextFile() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("saveAsTextFile"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用map操作将集合中所有数字乘以2 JavaRDD<Integer> doubleNumbers = numbers.map( new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); // 直接将rdd中的数据,保存在HFDS文件中 // 但是要注意,我们这里只能指定文件夹,也就是目录 // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件 doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt"); // 关闭JavaSparkContext sc.close(); } scala def saveAsTextFile() { } 6.countByKey Java private static void countByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("countByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, String>> scoreList = Arrays.asList( new Tuple2<String, String>("class1", "leo"), new Tuple2<String, String>("class2", "jack"), new Tuple2<String, String>("class1", "marry"), new Tuple2<String, String>("class2", "tom"), new Tuple2<String, String>("class2", "david")); // 并行化集合,创建JavaPairRDD JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList); // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数 // 这就是countByKey的作用 // countByKey返回的类型,直接就是Map<String, Object> Map<String, Object> studentCounts = students.countByKey(); for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) { System.out.println(studentCount.getKey() + ": " + studentCount.getValue()); } // 关闭JavaSparkContext sc.close(); } scala def countByKey() { val conf = new SparkConf() .setAppName("countByKey") .setMaster("local") val sc = new SparkContext(conf) val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"), Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry")) val students = sc.parallelize(studentList, 1) val studentCounts = students.countByKey() println(studentCounts) } } 7.Foreach 六、RDD持久化 1.持久化原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。 要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。 cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。 Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。 实验检测 public class Persist { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Persist") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // cache()或者persist()的使用,是有规则的 // 必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以 // 如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的 // 而且,会报错,大量的文件会丢失 JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt") .cache(); long beginTime = System.currentTimeMillis(); long count = lines.count(); System.out.println(count); long endTime = System.currentTimeMillis(); System.out.println("cost " + (endTime - beginTime) + " milliseconds."); beginTime = System.currentTimeMillis(); count = lines.count(); System.out.println(count); endTime = System.currentTimeMillis(); System.out.println("cost " + (endTime - beginTime) + " milliseconds."); sc.close(); } } 2.持久化策略 RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorageLevel即可。 持久化级别 含义 MEMORY_ONLY 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。 MEMORY_AND_DISK 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。 MEMORY_ONLY_SER 同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。 MEMORY_AND_DSK_SER 同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。 DISK_ONLY 使用非序列化Java对象的方式持久化,完全存储到磁盘上。 MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。 3.如何选择持久化策略 Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议: 1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。 2、如果MEMORY_ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。 3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。 4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。 七、共享变量 1.共享变量的原理 Spark一个非常重要的特性就是共享变量。 默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。 Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。 2. Broadcast Variable Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。 可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。 val factor = 3 val factorBroadcast = sc.broadcast(factor) val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr) val multipleRdd = rdd.map(num => num * factorBroadcast.value()) multipleRdd.foreach(num => println(num)) 3.Accumulator Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。 val sumAccumulator = sc.accumulator(0) val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr) rdd.foreach(num => sumAccumulator += num) println(sumAccumulator.value) 八、基于排序机制的wordcount程序 案例需求 1、对文本文件内的每个单词都统计出其出现的次数。 2、按照每个单词出现次数的数量,降序排序。 1.Java代码 import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 排序的wordcount程序 * @author Administrator * */ public class SortWordCount { public static void main(String[] args) { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("SortWordCount") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 创建lines RDD JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt"); // 执行我们之前做过的单词计数 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t, 1); } }); JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 到这里为止,就得到了每个单词出现的次数 // 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序 // wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2) // 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把! // 进行key-value的反转映射 JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair( new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<Integer, String>(t._2, t._1); } }); // 按照key进行排序 JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false); // 再次将value-key进行反转映射 JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair( new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { return new Tuple2<String, Integer>(t._2, t._1); } }); // 到此为止,我们获得了按照单词出现次数排序后的单词计数 // 打印出来 sortedWordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + " appears " + t._2 + " times."); } }); // 关闭JavaSparkContext sc.close(); } } 2.scala代码 import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author Administrator */ object SortWordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("SortWordCount") .setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1) val words = lines.flatMap { line => line.split(" ") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey(_ + _) val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1)) val sortedCountWords = countWords.sortByKey(false) val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1)) sortedWordCounts.foreach(sortedWordCount => println( sortedWordCount._1 + " appear " + sortedWordCount._2 + " times.")) } } 九、二次排序 案例需求 1、按照文件中的第一列排序。 2、如果第一列相同,则按照第二列排序。 1.java代码 package cn.spark.study.core; import java.io.Serializable; import scala.math.Ordered; /** * 自定义的二次排序key * @author Administrator * */ public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable { private static final long serialVersionUID = -2366006422945129991L; // 首先在自定义key里面,定义需要进行排序的列 private int first; private int second; public SecondarySortKey(int first, int second) { this.first = first; this.second = second; } @Override public boolean $greater(SecondarySortKey other) { if(this.first > other.getFirst()) { return true; } else if(this.first == other.getFirst() && this.second > other.getSecond()) { return true; } return false; } @Override public boolean $greater$eq(SecondarySortKey other) { if(this.$greater(other)) { return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public boolean $less(SecondarySortKey other) { if(this.first < other.getFirst()) { return true; } else if(this.first == other.getFirst() && this.second < other.getSecond()) { return true; } return false; } @Override public boolean $less$eq(SecondarySortKey other) { if(this.$less(other)) { return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public int compare(SecondarySortKey other) { if(this.first - other.getFirst() != 0) { return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } @Override public int compareTo(SecondarySortKey other) { if(this.first - other.getFirst() != 0) { return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } // 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法 public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortKey other = (SecondarySortKey) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } } package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 二次排序 * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法 * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD * 3、使用sortByKey算子按照自定义的key进行排序 * 4、再次映射,剔除自定义的key,只保留文本行 * @author Administrator * */ public class SecondarySort { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("SecondarySort") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt"); JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair( new PairFunction<String, SecondarySortKey, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondarySortKey, String> call(String line) throws Exception { String[] lineSplited = line.split(" "); SecondarySortKey key = new SecondarySortKey( Integer.valueOf(lineSplited[0]), Integer.valueOf(lineSplited[1])); return new Tuple2<SecondarySortKey, String>(key, line); } }); JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(); JavaRDD<String> sortedLines = sortedPairs.map( new Function<Tuple2<SecondarySortKey,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<SecondarySortKey, String> v1) throws Exception { return v1._2; } }); sortedLines.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); } } 2.Scala代码 class SecondSortKey(val first: Int, val second: Int) extends Ordered[SecondSortKey] with Serializable { def compare(that: SecondSortKey): Int = { if(this.first - that.first != 0) { this.first - that.first } else { this.second - that.second } } } import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author Administrator */ object SecondSort { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("SecondSort") .setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1) val pairs = lines.map { line => ( new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)} val sortedPairs = pairs.sortByKey() val sortedLines = sortedPairs.map(sortedPair => sortedPair._2) sortedLines.foreach { sortedLine => println(sortedLine) } } } 十、Topn 案例需求 1、对文本文件内的数字,取最大的前3个。 2、对每个班级内的学生成绩,取出前3名。(分组取topn) 3、课后作用:用Scala来实现分组取topn。 1.java代码 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** * 取最大的前3个数字 * @author Administrator * */ public class Top3 { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Top3") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt"); JavaPairRDD<Integer, String> pairs = lines.mapToPair( new PairFunction<String, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(String t) throws Exception { return new Tuple2<Integer, String>(Integer.valueOf(t), t); } }); JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false); JavaRDD<Integer> sortedNumbers = sortedPairs.map( new Function<Tuple2<Integer,String>, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Tuple2<Integer, String> v1) throws Exception { return v1._1; } }); List<Integer> sortedNumberList = sortedNumbers.take(3); for(Integer num : sortedNumberList) { System.out.println(num); } sc.close(); } } 2. import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 分组取top3 * @author Administrator * */ public class GroupTop3 { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Top3") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String line) throws Exception { String[] lineSplited = line.split(" "); return new Tuple2<String, Integer>(lineSplited[0], Integer.valueOf(lineSplited[1])); } }); JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey(); JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair( new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Iterable<Integer>> call( Tuple2<String, Iterable<Integer>> classScores) throws Exception { Integer[] top3 = new Integer[3]; String className = classScores._1; Iterator<Integer> scores = classScores._2.iterator(); while(scores.hasNext()) { Integer score = scores.next(); for(int i = 0; i < 3; i++) { if(top3[i] == null) { top3[i] = score; break; } else if(score > top3[i]) { for(int j = 2; j > i; j--) { top3[j] = top3[j - 1]; } top3[i] = score; break; } } } return new Tuple2<String, Iterable<Integer>>(className, Arrays.asList(top3)); } }); top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> scoreIterator = t._2.iterator(); while(scoreIterator.hasNext()) { Integer score = scoreIterator.next(); System.out.println(score); } System.out.println("======================================="); } }); sc.close(); } } 2.Scala代码 import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author Administrator */ object Top3 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("Top3") .setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("C://Users//Administrator//Desktop//top.txt", 1) val pairs = lines.map { line => (line.toInt, line) } val sortedPairs = pairs.sortByKey(false) val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1) val top3Number = sortedNumbers.take(3) for(num <- top3Number) { println(num) } } } 十一、spark内核架构 1.提交spark应用的机器行通过spark-submit(shell)提交自己的spark程序,称之为Application,提交应用程序后,会启动driver进程,spark-submit使用我们之前一直使用的那种提交模式去提交的时候,我们之前的提交模式就做standalone,其实会通过反射的方式。创建和构造一个DriverActor进程出来 2.执行我们的application应用程序,也就是我们自己编写的代码,代码首先是构造sparkConf、在构造sparkContext 3.sparkContext在初始化的时候,做的最重要的两件事就是构造出来DAGScheduler和taskScheduler。 4.taskScheduler(他有自己的后台进行)实际上,是负责通过他对对应的一个后台进程去连接master,向master注册application 5.master接收到application注册的请求之后,会使用自己的资源调度算法,在spark集群的worker节点上,为这个application启动多个executor。Master通知worker启动executor 6.executor启动之后,会自己反向注册到taskscheduler。每执行到一个action,就会创建一个job,job会提交给DAGScheduler,DAGScheduler会将job划分成多个stage,然后每一个stage创建一个taskSet, 7.taskscheduler会吧taskSet里每一个task,提交到executor执行(task分配算法) 8.worker会为application启动executor(进程),executor中有线程池。Executor,每接收到一个task,都会用taskRunner来封装task,然后从线程池里取出一个线程,执行这个task. 9.taskRunner:将我们编写的代码,也就是要执行的算子以及函数,拷贝,反序列化然后进行task 10.task有两种,shuffleMapTask和ResultTask.只有最后一个stage是ResultTask,之前的stage,都是shuffleMapTask. 11.所以。最后整个spark应用程序的执行,就是stage分批次作为taskset提交到executor执行,每一个task针对的RDD的一个partition,执行我们定义的算子和函数,以此类推 ,直到所有的操作执行完为止 十二、宽依赖和窄依赖 1、Application 2、spark-submit 3、Driver 4、SparkContext 5、Master 6、Worker 7、Executor 8、Job 9、DAGScheduler 10、TaskScheduler 11、ShuffleMapTask and ResultTask 1.窄依赖 Narrow Dependency 一个RDD,对他的父RDD,只有简单的一对一的依赖关系,也就是说,RDD的每个partition,仅依赖于父的一个partition。父的RDD和子的RDD的partition之间的对应关系是一对一的。这种情况下,是简单的RDD之间的依赖关系 2.宽依赖 Shuffle Dependecy,本质如其名,本质其实就是shuffle。也就是说。每一个父RDD的partition中的数据,都可能会传输一部分。到下一个RDD的每一个partition中,此时就会出现,父RDD和子RDD的partition之间,具有交互错综负责的关系。那么,这种情况,就叫做两个RDD之间的宽依赖。同时,他们之间的操作就是shuffle。 十三、基于YARN两种提交模式 1、Spark内核架构,其实就是第一种模式,standalone模式,基于Spark自己的Master-Worker集群。 2、第二种,是基于YARN的yarn-cluster模式。 3、第三种,是基于YARN的yarn-client模式。 4、如果,你要切换到第二种和第三种模式,很简单,将我们之前用于提交spark应用程序的spark-submit脚本,加上--master参数,设置为yarn-cluster,或yarn-client,即可。如果你没有设置,那么,就是standalone模式。 1.Yarn-cluster提交模式 Spark-submit提交,发送请求到resourcemanager,请求启动applicationmaster,resourcemanager分配一个containner,在某个nodemanager上启动一个applicationmasstesr,applicationmaster相当于是driver,applicationmaster连接其他的nodemanager来启动Executor,这里的nodemanager相当于是worker,executor启动后,向application反向注册 2.Yarn-clinet提交模式 Spark-submit提交,在本地启动driver进程(主要与yarn-cluster进程不同的地方,yarn-cluster是在nodemanager上启动),发送请求给resourcemanager,请求启动applicationmaster,resourcemanager分配一个container,在某个nodemanager上启动一个applicationmaster,但是这里的applicationmaster 其实只是一个executorLauncher。在nodemanager上启动executorLanucher(applicationmaser) ,resourcemanager分配一批container,然后applicationmaster连接其他的nodemanager,用container的资源,启动executor,executor反向注册到本地的driver上 3.选用原则 1.yarn-clinet用于测试,因为,driver运行在本地客户端,负责调整application,会与yarn集群产生大量的网络通信,从而导致网卡流量激增,可能会被你们公司的SA(运维)警告。好处是直接执行,本地可以看到所有的log。方便进行调试 2.Yarn-cluster用于生产环境,因为driver运行在nodemanager,没有忘啦流量激增的问题。缺点在于,调试不方便,本地应spark-submint提交后,看不大log,只能通过yarn application-logs application-id 这种命令查看,很是麻烦 十四、SparkContext原理剖析与源码分析 1、TaskScheduler 2、DAGScheduler 3、SparkUI 十五、 Spark性能优化 1.概述 由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。但是如果内存比较紧张,不足以放下所有的数据(比如在针对10亿以上的数据量进行计算时),还是需要对内存的使用进行性能优化的,比如说使用一些手段来减少内存的消耗。 Spark性能优化,其实主要就是在于对内存的使用进行调优。因为通常情况下来说,如果你的Spark应用程序计算的数据量比较小,并且你的内存足够使用,那么只要运维可以保障网络通常,一般是不会有大的性能问题的。但是Spark应用程序的性能问题往往出现在针对大数据量(比如10亿级别)进行计算时出现,因此通常来说,Spark性能优化,主要是对内存进行性能优化。当然,除了内存调优之外,还有很多手段可以优化Spark应用程序的性能。 2.spark性能调优的重要性 1.实际上Spark到目前为止,在大数据业界的影响力和覆盖度,还远没有达到Hadoop的水平,——虽然说,我们之前一再强调,Spark Core、Spark SQL、Spark Streaming,可以替代MapReduce、Hive查询引擎、Storm。但是事实就是,Spark还没有达到已经替代了它们的地步。 根据我在研究Spark,并且在一线使用Spark,与大量行业内的大数据相关从业人员沟通的情况来看。Spark最大的优点,其实也是它目前最大的问题——基于内存的计算模型。Spark由于使用了基于内存的计算模型,因此导致了其稳定性,远远不如Hadoop。虽然我也很喜欢和热爱Spark,但是这就是事实,Spark的速度的确达到了hadoop的几倍、几十倍、甚至上百倍(极端情况)。但是基于内存的模型,导致它经常出现各种OOM(内存溢出)、内部异常等问题。 说一个亲身经历的例子,曾经用Spark改写几个复杂的MapReduce程序,虽然MapReduce很慢,但是它很稳定,至少慢慢跑,是可以跑出来数据的。但是用Spark Core很快就改写完了程序,问题是,在整整半个月之内,Spark程序根本跑不起来,因为数据量太大,10亿+。导致它出现了各种各样的问题,包括OOM、文件丢失、task lost、内部异常等等各种问题。最后耗费了大量时间,最一个spark程序进行了大量的性能调优,才最终让它可以跑起来。 的确,用了Spark,比MapReduce的速度快了十倍,但是付出的代价是惨痛的,花了整整一个月的时间做这个事情。 2.因此,当我在公司推广Spark的使用时,很多人都不无担心地说,听说Spark还不够稳定,经常出现问题,比如OOM等,它的稳定性,导致业界的人们不太敢轻易尝试它,在复杂的大数据系统,要求极高稳定性的线程系统中使用。——当然,如果你就是开发一个针对公司内部的,稳定性要求不高的系统,当然不用担心这个问题。 所以,我认为,Spark的基于内存的本质,就导致了上述的问题,导致了它目前还无法完全提到Hadoop中的某些技术。 但是,纵然Spark有各种问题,其优点就是缺点,缺点也是优点——它实在是很快。优秀的Spark应用程序,性能完全可以达到MapReduce、Hive查询引擎的数倍、甚至数十倍。因此,纵使有各种担忧,Spark还是吸引着大量的人们以及公司去探索,和尝试攻克它,使用它,让它为我们所用,用它开放更棒的大数据系统。 因此,正是基于上述背景,Spark工程师的要求是非常高的。比如我们这里,我们正在用Spark开发大型复杂的线上大数据系统,所以针对Spark的招聘,我们是要求Spark工程师必须精通Spark内核源码,能够对程序进行性能优化。——打个广告,实际上,我认为如果能精通本系列课程,那么成为一个行业内优秀的Spark工程师,是一定没有问题的。 3.所以,Spark虽然有它的问题所在,但是它的优势还是让它以极快的速度,极强的劲头在行业内快速发展。行业内各个公司,也大量缺乏着优秀的Spark工程师。而如果是想转型进行Spark开发的朋友,基于上述种种背景,就应该明白了,Spark性能优化,对于你找工作,对于你在实际工作中解决问题的重要性了! 要成为优秀的Spark工程师,顺利实现转型,那么就必须能够彻底精通Spark内核源码,能够基于对Spark内核原理的深度理解,对线上复杂的Spark大数据系统 / 程序出现的报错和故障,进行排查和解决;能够对运行较慢的Spark应用程序,进行精准的性能问题排查,并且对症下游,针对各种性能问题,使用对应的技术手段,进行解决。 只有这样,我认为,你才能够顺利实现转型,出去成功面试Spark工程师,甚至是高级Spark工程师的岗位。才能在实际工作中,真正让Spark发挥出其巨大的威力。而不仅仅是处于对新技术的喜爱,对Spark进行浅尝辄止的学习——那是没有任何用的。 不精通Spark内核源码,不精通Spark性能优化,也许你能找到Spark大数据的工作,但是通常情况下,也只能进入比较缺人的小公司。要进入大公司,找到更好的职业机会,那么就一起在精通了之前的Spark内核源码深度剖析阶段之后,来进入Spark性能优化阶段的学习吧。 3.性能优化的技术 诊断内存的消耗 内存都花费在哪里了? 1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。 2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。 3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。 4、元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。 如何判断你的程序消耗了多少内存? 这里有一个非常简单的办法来判断,你的spark程序消耗了多少内存。 1、首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task / partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量。 2、其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。 3、最后,观察Driver的log,你会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。这就显示了每个partition占用了多少内存。 4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。 Spark的性能优化,主要手段包括: 1、使用高性能序列化类库 2、优化数据结构 3、对多次使用的RDD进行持久化 / Checkpoint 4、使用序列化的持久化级别 5、Java虚拟机垃圾回收调优 6、提高并行度 7、广播共享数据 8、数据本地化 9、reduceByKey和groupByKey的合理使用 10、Shuffle调优(核心中的核心,重中之重) 1、使用高性能序列化类库 数据序列化概述: 在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。 Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。 而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。 但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。 Spark提供了两种序列化的机制: Spark实际上提供了两种序列化机制,它只是默认使用了第一种: 1、Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。 2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。 如何使用Kryo序列化机制(一) 如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。 使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。 但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。 (实际上,下面的写法是错误的,因为counter不是共享的,所以累加的功能是无法实现的) val counter = new Counter(); val numbers = sc.parallelize(Array(1, 2, 3, 4, 5)) numbers.foreach(num => counter.add(num)); 如何使用Kryo序列化机制(二) 如果要注册自定义的类型,那么就使用如下的代码,即可: Scala版本: val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[Counter] )) val sc = new SparkContext(conf) Java版本: SparkConf conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Counter.class) JavaSparkContext sc = new JavaSparkContext(conf) 优化Kryo类库的使用 1、优化缓存大小 如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。 默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。 2、预先注册自定义类型 虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。 在什么场景下使用Kryo序列化类库? 首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,比如RDD的持久化,在后面会讲解。这里先不说。 那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。 此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。 因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。 2.优化数据结构 要减少内存的消耗,除了使用高效的序列化类库以外,还有一个很重要的事情,就是优化数据结构。从而避免Java语法特性中所导致的额外内存的开销,比如基于指针的Java数据结构,以及包装类型。 有一个关键的问题,就是优化什么数据结构?其实主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。 如何优化数据结构(一) 1、优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。 比如,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。 还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以优化为,特殊的字符串格式:id:name,address|id:name,address...。 2、避免使用多层嵌套的对象结构。比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象。 比如说,对于上述例子,也完全可以使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个很好的选择。 {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]} 3、对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,但是之前分析过,还是有额外信息的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。 这里提醒,在spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。(sdfsdfdf-234242342-sdfsfsfdfd) 3.对多次使用的RDD进行持久化或Checkpoint 如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。 此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。 4.使用序列化的持久化级别 除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。因为很有可能,RDD的数据是持久化到内存,或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。 这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比较小。 对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。 因此,对于序列化的持久化级别,还可以进一步优化,也就是说,使用Kryo序列化类库,这样,可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提前注册自定义类型。 5.Java虚拟机垃圾回收调优 Java虚拟机垃圾回收调优的背景 如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不在使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。 垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,比如array和string;其次就是在持久化rdd时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个partition就只是一个对象——一个字节数组。 监测垃圾回收 我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。 但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker上的日志中,而不是driver的日志中。 但是这种方式也只是一种,其实也完全可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。 优化executor内存比例 对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。 在这种情况下,很有可能因为你的内存空间的不足,task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。 在上述情况下,如果发现垃圾回收频繁发生。那么就需要对那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFraction", "0.5")即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。 因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗。给task提供更多的内存,从而避免task的执行频繁触发垃圾回收。 高级垃圾回收调优(一) Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象。年轻代又被划分了三块空间,Eden、Survivor1、Survivor2。 首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。 如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。 如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。 Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代。从而造成短时间存活的对象,长期呆在老年代中占据了空间,而且Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。 如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为: 1、包括降低spark.storage.memoryFraction的比例,给年轻代更多的空间,来存放短时间存活的对象; 2、给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/3; 3、如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 4/3。 总结: 其实啊,根据经验来看,对于垃圾回收的调优,尽量就是说,调节executor内存的比例就可以了。因为jvm的调优是非常复杂和敏感的。除非是,真的到了万不得已的地方,然后呢,自己本身又对jvm相关的技术很了解,那么此时进行eden区域的调节,调优,是可以的。 一些高级的参数 -XX:SurvivorRatio=4:如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代的比例是1/6,所以,你其实也可以尝试调大Survivor区域的大小。 -XX:NewRatio=4:调节新生代和老年代的比例 6.提高并行度 实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。 Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。 可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。 比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。 7.广播共享数据 如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。 这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。 8.数据本地化 数据本地化背景 数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。 数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别: 1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。 2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。 3、NO_PREF:数据从哪里过来,性能都是一样的。 4、RACK_LOCAL:数据和计算它的代码在一个机架上。 5、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。 数据本地化优化 Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。 Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。 可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。 9.reduceByKey和groupByKey val counts = pairs.reduceByKey(_ + _) val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum)) 如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。 只有在reduceByKey处理不了时,才用groupByKey().map()来替代。 10.shuffle调优