zoukankan      html  css  js  c++  java
  • 通过WordCount解析Spark RDD内部源码机制

    一、Spark WordCount动手实践

    我们通过Spark WordCount动手实践,编写单词计数代码;在wordcount.scala的基础上,从数据流动的视角深入分析Spark RDD的数据处理过程。

    首先需要建立一个文本文件helloSpark.txt,helloSpark.txt的文本内容如下。

    Hello Spark Hello Scala
    Hello Hadoop
    Hello Flink
    Spark is Awesome

    然后在Eclipse中编写wordcount.scala的代码如下。

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    
    object wordcount {
      def main(args: Array[String]): Unit = {
        
        // 第1步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
        val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
        
        // 第2步:创建SparkContext对象
        val sc = new SparkContext(conf)
        
        // 第3步:根据具体的数据来源来创建RDD
        val lines = sc.textFile("helloSpark.txt", 1)
        
        // 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
        val words = lines.flatMap{line=>line.split(" ")}
        val pairs = words.map{word=>(word,1)}
        val wordCountsOdered = pairs.reduceByKey(_+_).map(
          pair=>(pair._2,pair._1)    
        ).sortByKey(false).map(pair=>(pair._2,pair._1))
        wordCountsOdered.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2))
        sc.stop()
        
      }
    }

    在Eclipse中运行程序,wordcount.scala的运行结果如下:

    二、解析RDD生成的内部机制

    下面详细解析一下wordcount.scala的运行原理。

    (1)从数据流动视角解密WordCount,使用Spark作单词计数统计,搞清楚数据到底是怎么流动的。

    (2)从RDD依赖关系的视角解密WordCount。Spark中的一切操作都是RDD,后面的RDD对前面的RDD有依赖关系。

    (3)DAG与血统Lineage的思考。

    在wordcount.scala的基础上,我们从数据流动的视角分析数据到底是怎么处理的。下面有一张WordCount数据处理过程图,由于图片较大,为了方便阅读,将原图分成两张图,如下面两张图所示。

    数据在生产环境中默认在HDFS中进行分布式存储,如果在分布式集群中,我们的机器会分成不同的节点对数据进行处理,这里我们在本地测试,重点关注数据是怎么流动的。处理的第一步是获取数据,读取数据会生成HadoopRDD。

    在WordCount.scala中,单击sc.textFile进入Spark框架,SparkContext.scala的textFile的源码如下。

      /**
       * Read a text file from HDFS, a local file system (available on all nodes), or any
       * Hadoop-supported file system URI, and return it as an RDD of Strings.
       */
      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }

    下面看一下hadoopFile的源码,HadoopRDD从Hdfs上读取分布式数据,并且以数据分片的方式存在于集群中。所谓的数据分片,就是把我们要处理的数据分成不同的部分,例如,在集群中有4个节点,粗略的划分可以认为将数据分成4个部分,4条语句就分成4个部分。例如,Hello Spark在第一台机器上,Hello Hadoop在第二台机器上,Hello Flink在第三台机器上,Spark is Awesome在第四台机器上。HadoopRDD帮助我们从磁盘上读取数据,计算的时候会分布式地放入内存中,Spark运行在Hadoop上,要借助Hadoop来读取数据。

    Spark的特点包括:分布式、基于内存(部分基于磁盘)、可迭代;默认分片策略Block多大,分片就多大。但这种说法不完全准确,因为分片记录可能跨两个Block,所以一个分片不会严格地等于Block的大小。例如,HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。分片不一定小于128MB,因为如果最后一条记录跨两个Block,分片会把最后一条记录放在前一个分片中。这里,HadoopRDD用了4个数据分片,设想为128M左右。

    hadoopFile的源码如下。

      def hadoopFile[K, V](
          path: String,
          inputFormatClass: Class[_ <: InputFormat[K, V]],
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
        assertNotStopped()
    
        // This is a hack to enforce loading hdfs-site.xml.
        // See SPARK-11227 for details.
        FileSystem.getLocal(hadoopConfiguration)
    
        // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
        val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
        val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions).setName(path)
      }
    

    SparkContext.scala的textFile源码中,调用hadoopFile方法后进行了map转换操作,map对读取的每一行数据进行转换,读入的数据是一个Tuple,Key值为索引,Value值为每行数据的内容,生成MapPartitionsRDD。这里,map(pair => pair._2.toString)是基于HadoopRDD产生的Partition去掉的行Key产生的Value,第二个元素是读取的每行数据内容。MapPartitionsRDD是Spark框架产生的,运行中可能产生一个RDD,也可能产生两个RDD。例如,textFile中Spark框架就产生了两个RDD,即HadoopRDD和MapPartitionsRDD。下面是map的源码。

      /**
       * Return a new RDD by applying a function to all elements of this RDD.
       */
      def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }

    我们再来看一下WordCount业务代码,对读取的每行数据进行flatMap转换。这里,flatMap对RDD中的每一个Partition的每一行数据内容进行单词切分,如有4个Partition分别进行单词切分,将“Hello Spark”切分成单词“Hello”和“Spark”,对每一个Partition中的每一行进行单词切分并合并成一个大的单词实例的集合。flatMap转换生成的仍然是MapPartitionsRDD:

      /**
       *  Return a new RDD by first applying a function to all elements of this
       *  RDD, and then flattening the results.
       */
      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }
    

    继续WordCount业务代码,计数之后进行一个关键的reduceByKey操作,对全局的数据进行计数统计。reduceByKey对相同的Key进行Value的累计(包括Local和Reducer级别,同时Reduce)。reduceByKey在MapPartitionsRDD之后,在Local reduce级别本地进行了统计,这里也是MapPartitionsRDD。例如,在本地将(Hello,1),(Spark,1),(Hello,1),(Scala,1)汇聚成(Hello,2),(Spark,1),(Scala,1)。

    Shuffle之前的Local Reduce操作主要负责本地局部统计,并且把统计以后的结果按照分区策略放到不同的file。举一个简单的例子,如果下一个阶段Stage是3个并行度,每个Partition进行local reduce以后,将自己的数据分成3种类型,最简单的方式是根据HashCode按3取模。

    PairRDDFunctions.scala的reduceByKey的源码如下。

      /**
       * Merge the values for each key using an associative and commutative reduce function. This will
       * also perform the merging locally on each mapper before sending results to a reducer, similarly
       * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
       * parallelism level.
       */
      def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        reduceByKey(defaultPartitioner(self), func)
      }  

    至此,前面所有的操作都是一个Stage,一个Stage意味着什么:完全基于内存操作。父Stage:Stage内部的操作是基于内存迭代的,也可以进行Cache,这样速度快很多。不同于Hadoop的Map Redcue,Hadoop Map Redcue每次都要经过磁盘。

    reduceByKey在Local reduce本地汇聚以后生成的MapPartitionsRDD仍属于父Stage;然后reduceByKey展开真正的Shuffle操作,Shuffle是Spark甚至整个分布式系统的性能瓶颈,Shuffle产生ShuffleRDD,ShuffledRDD就变成另一个Stage,为什么是变成另外一个Stage?因为要网络传输,网络传输不能在内存中进行迭代。

    从WordCount业务代码pairs.reduceByKey(_+_)中看一下PairRDDFunctions.scala的reduceByKey的源码。

      /**
       * Merge the values for each key using an associative and commutative reduce function. This will
       * also perform the merging locally on each mapper before sending results to a reducer, similarly
       * to a "combiner" in MapReduce.
       */
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }

    reduceByKey内部调用了combineByKeyWithClassTag方法。下面看一下PairRDDFunctions. scala的combineByKeyWithClassTag的源码。

      def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("Default partitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }

    在combineByKeyWithClassTag方法中就用new()函数创建了ShuffledRDD。

    前面假设有4台机器并行计算,每台机器在自己的内存中进行迭代计算,现在产生Shuffle,数据就要进行分类,MapPartitionsRDD数据根据Hash已经分好类,我们就抓取MapPartitionsRDD中的数据。我们从第一台机器中获取的内容为(Hello,2),从第二台机器中获取的内容为(Hello,1),从第三台机器中获取的内容为(Hello,1),把所有的Hello都抓过来。同样,我们把其他的数据(Hadoop,1),(Flink,1)……都抓过来。

    这就是Shuffle的过程,根据数据的分类拿到自己需要的数据。注意,MapPartitionsRDD属于第一个Stage,是父Stage,内部基于内存进行迭代,不需要操作都要读写磁盘,所以速度非常快;从计算算子的角度讲,reduceByKey发生在哪里?reduceByKey发生的计算过程包括两个RDD:一个是MapPartitionsRDD;一个是ShuffledRDD。ShuffledRDD要产生网络通信。

    reduceByKey之后,我们将结果收集起来,进行全局级别的reduce,产生reduceByKey的最后结果,如将(Hello,2),(Hello,1),(Hello,1)在内部变成(Hello,4),其他数据也类似统计。这里reduceByKey之后,如果通过Collect将数据收集起来,就会产生MapPartitionsRDD。从Collect的角度讲,MapPartitionsRDD的作用是将结果收集起来发送给Driver;从saveAsTextFile输出到Hdfs的角度讲,例如输出(Hello,4),其中Hello是key,4是Value吗?不是!这里(Hello,4)就是value,这就需要设计一个key出来。

    下面是RDD.scala的saveAsTextFile方法。

      /**
       * Save this RDD as a text file, using string representations of elements.
       */
      def saveAsTextFile(path: String): Unit = withScope {
        // https://issues.apache.org/jira/browse/SPARK-2075
        //
        // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
        // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
        // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
        // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
        // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
        //
        // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
        // same bytecodes for `saveAsTextFile`.
        val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
        val textClassTag = implicitly[ClassTag[Text]]
        val r = this.mapPartitions { iter =>
          val text = new Text()
          iter.map { x =>
            text.set(x.toString)
            (NullWritable.get(), text)
          }
        }
        RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
          .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
      }

    RDD.scala的saveAsTextFile方法中的iter.map {x=>text.set(x.toString) (NullWritable.get(), text)},这里,key转换成Null,value就是内容本身(Hello,4)。saveAsHadoopFile中的TextOutputFormat要求输出的是key-value的格式,而我们处理的是内容。回顾一下,之前我们在textFile读入数据的时候,读入split分片将key去掉了,计算的是value。因此,输出时,须将丢失的key重新弄进来,这里key对我们没有意义,但key对Spark框架有意义,只有value对我们有意义。第一次计算的时候我们把key丢弃了,所以最后往HDFS写结果的时候需要生成key,这符合对称法则和能量守恒形式。

    三、WordCount总结:

      第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD。

      第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD。

      

     

     

  • 相关阅读:
    每日日报2020.12.1
    每日日报2020.11.30
    981. Time Based Key-Value Store
    1146. Snapshot Array
    565. Array Nesting
    79. Word Search
    43. Multiply Strings
    Largest value of the expression
    1014. Best Sightseeing Pair
    562. Longest Line of Consecutive One in Matrix
  • 原文地址:https://www.cnblogs.com/xiaoyh/p/13604146.html
Copyright © 2011-2022 走看看