zoukankan      html  css  js  c++  java
  • 16、job触发流程原理剖析与源码分析

    一、以Wordcount为例来分析

    1、Wordcount

    val lines = sc.textFile()
    val words = lines.flatMap(line => line.split(" "))
    val pairs = words.map(word => (word, 1))
    val counts = pairs.reduceByKey(_ + _)
    counts.foreach(count => println(count._1 + ": " + count._2))


    2、源码分析

    ###org.apache.spark/SparkContext.scala
    ###textFile()
    
        /**
        * 首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value)pais
        * key是hdfs或文本文件的每一行的offset,value是文本行
        * 然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionRDD
        * MapPartitionRDD内部的元素,其实就是一行一行的文本行
        * @param path
        * @param minPartitions
        * @return
        */
      def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map( pair => pair._2.toString).setName(path)
      }
    
    
    
    
    ###org.apache.spark.rdd/RDD.scala
    
      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }
    
      
      def map[U: ClassTag](f: T => U): RDD[U] = {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }
    
    
    
    
    其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,
    会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。
    
      implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
        (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
        new PairRDDFunctions(rdd)
      }
    
    
    
    
    
    接着会调用PairRDDFunctions中的reduceByKey()方法;
    
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
        combineByKey[V]((v: V) => v, func, func, partitioner)
      }
    
    
    
    
    
    
    ###org.apache.spark.rdd/RDD.scala
    
      def foreach(f: T => Unit) {
        val cleanF = sc.clean(f)
        sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }
    
    
    foreach调用了runJob方法,一步步追踪runJob方法,首先调用SparkContext的runJob:
    
      def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.size, false)
      }
    
    …
    
    最后:
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        if (stopped) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        // 调用SparkContext,之前初始化时创建的dagScheduler的runJob()方法
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
  • 相关阅读:
    【转】【MFC】 StretchBlt绘图图像失真
    【转】MFC 各类型相互转换
    【转】MFC CListCtrl 使用技巧
    【数学】关于已知线段长度获取某一点对应线段的百分比
    【MySQL】MySQL 常用语法之锁表与解锁表
    C#通用类库
    WPF Knowledge Points
    WPF中的WndProc
    C# 重写WndProc 拦截 发送 系统消息 + windows消息常量值
    c#正则获取html里面a标签href的值
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11212990.html
Copyright © 2011-2022 走看看