zoukankan      html  css  js  c++  java
  • 1.spark的wordcount解析

    一、Eclipse(scala IDE)开发local和cluster

    (一). 配置开发环境

    1. 要在本地安装好java和scala。 
      由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好。
    2. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.html 
      打开ide新建scala project 
      点击file -> new ->Scala Project ,在弹出的对话框中弹性project name 为“WordCount”,默认点击next,点击finish的。
    3. 修改Scala版本 
      项目创建完成后默认使用的是scala的2.11.7 版本。要手动将版本换成2.10.X。在项目名称右击选择properties,在弹出窗口点击,scala Compiler,在右侧窗口,选中Use Project settings, 将scala Installation 修改为Latest 2.10 bundle(dynamic).点击apply,点击ok。scala版本变成2.10.6。

    4. 找到依赖的spark jar文件并导入到eclipse中。 
      所依赖的jar文件是 
      spark-1.6.0-bin-hadoop2.6libspark-assembly-1.6.0-hadoop2.6.0.jar。 
      在项目名称上右击,选择build path ->configure build path。在弹出框中点击library,点击右侧的addExternalJARs,然后选择 
      park-assembly-1.6.0-hadoop2.6.0.jar点击打开,然后点击ok。

    (二)、spark程序开发步骤

    1. 在src下建立spark程序工程包

    在src上右击new ->package 填入package的name为com.dt.spark。

    2. 创建scala的入口类。

    在包的名字上右击选择new ->scala class 。在弹出框中Name 中,在增加WordCount。点击finish。 
    在方法内部讲关键字class 改成object ,然后创建main方法。

    3. local模式代码方法

      1. import org.apache.spark.SparkConf
      2. import org.apache.spark.rdd.RDD
      3. def main(args: Array[String]): Unit ={
      4. * 集群的master的URL,如果设置为local则在本地运行。
      5. val conf = new SparkConf()
      6. conf.setMaster("local")
      7. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
      8. * */
      9. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
      10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
      11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
      12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
      13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
      14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
      15. }
      16. 在运行过程中会出现WARN NativeCodeLoader: Unable to load native-Hadoop library for your platform... using builtin-Java classes where applicable。Java.io.IOException: Could not locate executable nullinwinutils.exe in the Hadoop binaries. 这个错误。但是在local模式下,这个是正常的。因为spark是和hadoop编译在一起的,我们在window 下开发,缺少hadoop的配置。这不是程序错误,也不影响我们的任何功能。

        4.编写Cluster模式代码

        1. import org.apache.spark.SparkContext
        2. def main(args: Array[String]){
        3. * 集群的master的URL,如果设置为local则在本地运行。
        4. val conf = new SparkConf() //创建SparkConf对象
        5. // conf.setMaster("spark://master:7077")
        6. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
        7. * */
        8. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
        9. * */
        10. val lines = sc.textFile("/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions
        11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
        12. val words = lines.flatMap { line =>line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
        13. val pairs = words.map { word => (word, 1) }
        14. wordCounts.collect.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
        15. }
        16. 将程序达成jar 包 
          在项目名称上右击点击export选择java 下的jar file,点击next,选择输出目录,输入文件名,点击next,点击next,然后点击完成。导出jar 包。
          将jar 放到Linux系统某个目录中。执行 
          ./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://worker1:7077 ./wordcount.jar

          也可以将以上命令保存到.sh文件中,直接执行sh文件即可。

          二、使用idea开发spark的Local和Cluster

          (一)、配置开发环境

          1. 要在本地安装好java和scala。

          由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好

          2. 下载IDEA 社区版本,选择windows 版本并按照配置。

          安装完成以后启动IDEA,并进行配置,默认即可,然后点击ok以后,设置ui风格,然后点击next 会出现插件的选择页面,默认不需求修改,点击next,选择安装scala语言,点击install 按钮(非常重要,以为要开发spark程序所以必须安装),等安装完成以后点击start启动IDEA。

          3. 创建scala项目

          点击 create new project ,然后填写project name为“Wordcount”,选择项目的保存地址project location。 
          然后设置project sdk即java 的安装目录。点击右侧的new 按钮,选择jdk,然后选择java 的安装路径即可。 
          然后选择scalasdk。点击右侧的create ,默认出现时2.10.x 版本的scala,点击ok即可。然后点击finish。

          4. 设置spark的jar 依赖。

          点击file->project structure 来设置工程的libraries。核心是添加spark的jar依赖。选择Libraries ,点击右侧的加号,选择java,选择spark1.6.0 的spark-1.6.0-bin-hadoop2.6libspark-assembly-1.6.0-hadoop2.6.0.jar。点击ok。稍等片刻后然后点击ok(Libraries作用于WordCount),然后点击apply,点击ok。(这一步很重要,如果没有无法编写spark的代码)

          (二)、编写代码

          1. 在src下建立spark程序工程包

          在src上右击new ->package 填入package的name为com.dt.spark。

          2. 创建scala的入口类。

          在包的名字上右击选择new ->scala class 。在弹出框中填写Name ,并制定kind为object ,点击ok。

          3. 编写local代码

          1. import org.apache.spark.SparkConf
          2. import org.apache.spark.rdd.RDD
          3. def main(args: Array[String]): Unit ={
          4. * 集群的master的URL,如果设置为local则在本地运行。
          5. val conf = new SparkConf()
          6. conf.setMaster("local")
          7. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
          8. * */
          9. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
          10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
          11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
          12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
          13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
          14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
          15. }
          16. 在代码去右击选择点击run”wordCount”来运行程序。在生成环境下肯定是写自动化shell 脚本自动提交程序的。 
            注意:如果val sc = new SparkContext(conf)报错,并且没有运行结果,需要将scala的module改成scala 2.10版本的。具体操作:File->project structure -> Dependencies ->删除scala 2.11.x的module.-> 左上角的“+” -> scala ->选中scala2.10.4 -> apply

            4. 编写Cluster模式代码

            1. import org.apache.spark.SparkConf
            2. import org.apache.spark.rdd.RDD
            3. def main(args: Array[String]): Unit ={
            4. * 集群的master的URL,如果设置为local则在本地运行。
            5. val conf = new SparkConf()
            6. //conf.setMaster("spark://master:7077")
            7. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
            8. val sc = new SparkContext(conf)
            9. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
            10. * */
            11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
            12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
            13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
            14. pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加并且排名
            15. println(wordNumberPair._1 + ":" + wordNumberPair._2))
            16. }
            17. 将程序达成jar 包 
              点击file->project structure,在弹出的页面点击Artifacts,点击右侧的“+”,选择jar –> from modules with dependencies,在弹出的页面中,设置好main class 然后点击ok,在弹出页面修改Name(系统生成的name不规范)、导出位置并删除scala和spark的jar(因为集群环境中已经存在)点击ok 。然后在菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。

              在spark中执行wordcount方法。 
              将jar 放到linux系统某个目录中。执行

              1. 注意事项: 
                为什么不能再ide开发环境中,直接发布spark程序到spark集群中? 
                1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver在提交spark程序的机器上,如果在idea中提交程序的话,那idea机器就必须非常强大。 
                2. Dirver要指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题。 
                3. 这是不安全的。

                三、WordCount的java开发版本

                 
              2. 安装jdk并配置环境变量 
                系统变量→新建 JAVA_HOME 变量。 
                变量值填写jdk的安装目录(本人是 E:Javajdk1.7.0) 
                系统变量→寻找 Path 变量→编辑 
                在变量值最后输入 %JAVA_HOME%in;%JAVA_HOME%jrein;(注意原来Path的变量值末尾有没有;号,如果没有,先输入;号再输入上面的代码) 
                系统变量→新建 CLASSPATH 变量值填写 .;%JAVA_HOME%lib;%JAVA_HOME%lib ools.jar(注意最前面有一点)
              3. Maven的安装和配置 
                解压apache-maven-3.1.1-bin.zip,并把解压后的文件夹下的apache-maven-3.1.1文件夹移动到D:Java下,如果没有Java这个文件夹的话,请自行创建 
                新建系统变量 MAVEN_HOME 变量值:D:Javaapache-maven-3.1.1。编辑系统变量 Path 添加变量值: ;%MAVEN_HOME%in。 
                在mave 的目录中修改conf/settings.xml,在localRepository属性后添加D:/repository修改maven下载jar 的位置。
              4. eclipse 中java 和maven 的配置 
                点击 window ->java ->Installed JREs ->add ->standard vm ,点击next ,然后选择jdk 的安装路径点击finish即可。 
                点击window ->Maven ->Installations ->add 在弹出页面选择mave 的安装路径,然后点击finish。然后在列表中选择我们自己刚添加的那个maven信息。 
                然后点击window ->Maven ->User Setings 在右侧的User Settings 点击browse 现在mavenconf目录下的setttings.xml .(主要是修改maven下载依赖包存放的位置)
              (二). 创建maven项目
              1. 创建maven项目 
                点击file ¬->new ->others ->maven project 点击next,选择maven-archetype-quickstart ,点击next,group id 为 com.dt.spark,artifact id 为 sparkApps,然后点击finish。
              2. 修改jdk和pom文件 
                创建maven项目后,默认的jdk是1.5要改成我们前面安装好的jdk1.8。在项目上右击build path ->configure build path 。在弹出页面点击Libraries,选中jre system library 。点击edit,在弹出框选择workspace default jre ,然后点击finish。然后在点击ok。将pom文件修改为如下内容,然后等待eclipse下载好maven依赖的jar包,并编译工程。编译好工程后有个错误提示,在此错误列上,右击选择quick fix ,在弹出页面点击finish即可。
              1. xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              2. 4.0.0</modelVersion>
              3. <groupId>com.dt.spark</groupId>
              4. SparkApps</artifactId>
              5. 0.0.1-SNAPSHOT</version>
              6. jar</packaging>
              7. <name>SparkApps</name>
              8. http://maven.apache.org</url>
              9. <properties>
              10. UTF-8</project.build.sourceEncoding>
              11. <dependencies>
              12. junit</groupId>
              13. junit</artifactId>
              14. 3.8.1</version>
              15. test</scope>
              16. org.apache.spark</groupId>
              17. spark-core_2.10</artifactId>
              18. 1.6.0</version>
              19. org.apache.spark</groupId>
              20. spark-sql_2.10</artifactId>
              21. 1.6.0</version>
              22. org.apache.spark</groupId>
              23. spark-hive_2.10</artifactId>
              24. 1.6.0</version>
              25. org.apache.spark</groupId>
              26. spark-streaming_2.10</artifactId>
              27. 1.6.0</version>
              28. org.apache.hadoop</groupId>
              29. hadoop-client</artifactId>
              30. 2.6.0</version>
              31. org.apache.spark</groupId>
              32. spark-streaming-kafka_2.10</artifactId>
              33. 1.6.0</version>
              34. org.apache.spark</groupId>
              35. spark-graphx_2.10</artifactId>
              36. 1.6.0</version>
              37. <build>
              38. src/main/java</sourceDirectory>
              39. src/main/test</testSourceDirectory>
              40. <plugins>
              41. maven-assembly-plugin</artifactId>
              42. jar-with-dependencies</descriptorRef>
              43. make-assembly</id>
              44. package</phase>
              45. single</goal>
              46. org.codehaus.mojo</groupId>
              47. exec-maven-plugin</artifactId>
              48. 1.3.1</version>
              49. exec</goal>
              50. java</executable>
              51. false</includeProjectDependencies>
              52. compile</classpathScope>
              53. com.dt.spark.SparkApps.WordCount</mainClass>
              54. org.apache.maven.plugins</groupId>
              55. maven-compiler-plugin</artifactId>
              56. 1.6</source>
              57. 1.6</target>
              58. </project>
              1. 创建包路径以及java代码 
                在包路径com.dt.spark.SparkApps上右击 new ->package 在弹出页面name中填写com.dt.spark.SparkApps.cores,点击finish的。 
                在包路径下com.dt.spark.SparkApps.cores上右击 new ->class ,在弹出窗口中name中填写WordCount ,点击finish。然后在WordCount 中编写如下代码。
              (三). local版本
              1. import java.util.Arrays;
              2. import scala.Function;
              3. public static void main(String[] args){
              4. //其底层就是scala的SparkContext
              5. String> lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md");
              6. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
              7. public Iterable<String> call(String line)throws Exception{
              8. });
              9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
              10. public Tuple2<String, Integer> call(String word)throws Exception{
              11. String, Integer>(word, 1);
              12. });
              13. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
              14. public Integer call(Integer v1, Integer v2)throws Exception{
              15. });
              16. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
              17. public void call(Tuple2<String, Integer>pair)throws Exception{
              18. });
              19. }

            在代码区右击run as -> java application 。来运行此程序并查看运行结果。

            (四). cluster版本的代码
            1. import java.util.Arrays;
            2. import scala.Function;
            3. public static void main(String[] args){
            4. String> lines = sc.textFile("/library/wordcount/input/Data");
            5. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
            6. public Iterable<String> call(String line)throws Exception{
            7. });
            8. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
            9. public Tuple2<String, Integer> call(String word)throws Exception{
            10. String, Integer>(word, 1);
            11. });
            12. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
            13. public Integer call(Integer v1, Integer v2)throws Exception{
            14. });
            15. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            16. public void call(Tuple2<String, Integer>pair)throws Exception{
            17. });
            18. }

          四、彻底解析wordcount运行原理

          1. 从数据流动视角解密WordCount

          即用Spark作单词计数统计,数据到底是怎么流动的,参看一图: 
          从数据流动的视角分析数据到底是怎么被处理

          1. word,1)).reduceByKey(_+_).saveAsTextFile(outputPathwordcount)

          简单实验

          (1)在IntelliJ IDEA中编写下面代码:

          1. import org.apache.spark.SparkConf
          2. object WordCount {
          3. valconf = new SparkConf()
          4. conf.setMaster("local")
          5. val lines = sc.textFile("D://tmp//helloSpark.txt", 1)
          6. line.split(" ") }
          7. (word,1) }
          8. wordCounts.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
          9. }
          10. 2)在D盘下地tmp文件夹下新建helloSpark.txt文件,内容如下:
          11. Hello Hadoop
          12. Spark is awesome
          13. Flink : 1
          14. is : 1
          15. awesome : 1
          16. Scala : 1

          Spark有三大特点:

          1. 分布式。无论数据还是计算都是分布式的。默认分片策略:Block多大,分片就多大。但这种说法不完全准确,因为分片切分时有的记录可能跨两个Block,所以一个分片不会严格地等于Block的大小,例如HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。一般情况下,分片都不会完全与Block大小相等。 
            分片不一定小于Block大小,因为如果最后一条记录跨两个Block的话,分片会把最后一条记录放在前一个分片中。
          2. 基于内存(部分基于磁盘)
          3. 迭代

          查看在SparkContext.scala中的testFile源码

          1. path: String,
          2. assertNotStopped()
          3. minPartitions).map(pair => pair._2.toString)
          4. 可以看出在进行了hadoopFile之后又进行了map操作。 
            HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。

            RDD.scala中的map源码

            1. * Return a new RDD by applying a function to all elements of this RDD.
            2. def map[U: ClassTag](f: T => U): RDD[U] = withScope {
            3. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
            4. 读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。 
              此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。 
              注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。
              下一步:
              1. line.split(" ") }

              对每个Partition中的每行进行单词切分,并合并成一个大的单词实例的集合。 
              FlatMap做的一件事就是对RDD中的每个Partition中的每一行的内容进行单词切分。 
              这边有4个Partition,对单词切分就变成了一个一个单词,

              下面是FlatMap的源码(RDD.scala中)

              1. * Return a new RDD by first applying a function to all elements of this
              2. */
              3. TraversableOnce[U]): RDD[U] = withScope {
              4. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
              5. 可以看出flatMap又产生了一个MapPartitionsRDD,此时的各个Partition都是拆分后的单词。 
                下一步:
                1. (word,1) }

                将每个单词实例变为形如word=>(word,1) 
                map操作就是把切分后的每个单词计数为1。 
                根据源码可知,map操作又会产生一个MapPartitonsRDD。此时的MapPartitionsRDD是把每个单词变成Array(""Hello",1),("Spark",1)等这样的形式。 
                下一步:

                1. reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。 
                  shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。 
                  下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。 
                  至此都是stage1。 
                  Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。

                  reduceByKey的源码(PairRDDFunctions.scala中):

                  1. V): RDD[(K, V)] = self.withScope {
                  2. v, func, func, partitioner)
                  3. * Merge the values for each key using an associative and commutative reduce function. This will
                  4. * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
                  5. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
                  6. }
                  7. /**
                  8. * also perform the merging locally on each mapper before sending results to a reducer, similarly
                  9. * parallelism level.
                  10. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
                  11. }

                  可以看到reduceByKey内部有combineByKeyWithClassTag。combineByKeyWithClassTag的源码如下:

                  1. createCombiner: V => C,
                  2. C,
                  3. C,
                  4. mapSideCombine: Boolean = true,
                  5. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
                  6. if (mapSideCombine) {
                  7. }
                  8. throw new SparkException("Default partitioner cannot partition array keys.")
                  9. }
                  10. self.context.clean(createCombiner),
                  11. self.context.clean(mergeCombiners))
                  12. self.mapPartitions(iter => {
                  13. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
                  14. } else {
                  15. .setSerializer(serializer)
                  16. .setMapSideCombine(mapSideCombine)
                  17. }

                  可以看出在combineByKeyWithClassTag内又new 了一个ShuffledRDD。 
                  ReduceByKey有两个作用: 
                  1. 进行Local级别的Reduce,减少网络传输。 
                  2. 把当前阶段的内容放到本地磁盘上供shuffle使用。

                  下一步是shuffledRDD,

                  产生Shuffle数据就需要进行分类,MapPartitionsRDD时其实已经分好类了,最简单的分类策略就是Hash分类。 
                  ShuffledRDD需要从每台机上抓取同一单词。 
                  reduceByKey发生在哪里? 
                  Stage2全部都是reduceByKey

                  最后一步:保存数据到HDFS(MapPartitionsRDD)

                  统计完的结果:(“Hello”,4)只是一个Value,而不是Key:"Hello",value:4。但输出到文件系统时需要KV的格式,现在只有Value,所以需要造个KEY。

                  saveAsTextFile的源码:

                  1. this.map(x => (NullWritable.get())),new Text(x.toStirng))
                  2. }

                  this.map把当前的值(x)变成tuple。tuple的Key是Null,Value是(“Hello”,4)。 
                  为什么要为样?因为saveAsHadoopFile时要求以这样的格式输出。Hadoop需要KV的格式!! 
                  map操作时把key舍去了,输出时就需要通过生成Key。 
                  第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD 
                  第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD

                  只有Collect 或saveAsTextFile会触发作业,其他的时候都没有触发作业(Lazy)

                  2. 从RDD依赖关系的视角解密WordCount。Spark中的一切操作皆RDD,后面的RDD对前面的RDD有依赖关系。

                  3. DAG与Lineage的思考。依赖关系会形成DAG。

  • 相关阅读:
    算法训练 P1103
    算法训练 表达式计算
    算法训练 表达式计算
    基础练习 时间转换
    基础练习 字符串对比
    Codeforces 527D Clique Problem
    Codeforces 527C Glass Carving
    Codeforces 527B Error Correct System
    Codeforces 527A Glass Carving
    Topcoder SRM 655 DIV1 250 CountryGroupHard
  • 原文地址:https://www.cnblogs.com/yejibigdata/p/6513688.html
Copyright © 2011-2022 走看看