zoukankan      html  css  js  c++  java
  • 66、Spark Streaming:数据处理原理剖析与源码分析(block与batch关系透彻解析)

    一、数据处理原理剖析

    image


    每隔我们设置的batch interval 的time,就去找ReceiverTracker,将其中的,从上次划分batch的时间,到目前为止的这个batch interval time间隔内的block封装为一个batch;
    
    其次,会将这个batch中的数据,去创建为一个初始的RDD,一个batch内,在这段时间封装了几个block,就代表这个batch对应的RDD内会有几个partition;
    
    这个batch对应的RDD的partition决定了数据处理阶段的并行度,这个跟调优关系很大,如果想增加数据处理阶段的性能,就考虑增加并行度,那么就考虑缩短block interval;
    
    只有output操作中,使用了ForEachStream,其中定义了generatorJob()方法,在数据处理阶段,才触发针对接收到的一个一个batch的数据,触发小的job,去处理该batch的数据;
    
    最后一步,去找JobScheduler去调度job,job的输入RDD,就是batch对应的RDD;


    二、源码分析

    入口,JobGenerator的generateJobs()方法

    ###org.apache.spark.streaming.scheduler/JobGenerator.scala
    
     /**
        * 定时,调度generateJobs()方法,传入一个time,其实就是一个batch interval内的时间段
        */
      private def generateJobs(time: Time) {
        // Set the SparkEnv in this thread, so that job generation code can access the environment
        // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
        // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
        SparkEnv.set(ssc.env)
        Try {
          // 找到ReceiverTracker,调用其allocateBlocksToBatch方法,将当前时间段内的block分配给一个batch,并为其
          // 创建一个RDD
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          // 调用DSteamGraph的generateJobs()来根据程序定义的DSteam之间的依赖关系和算子,生成job
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
            // 如果成功创建了job
          case Success(jobs) =>
            // 从ReceiverTracker中,获取当前batch interval对应的block数据
            val receivedBlockInfos =
              jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
            // 用jobScheduler提交job,其对应的原始数据,是那批block
            jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
        }
        eventActor ! DoCheckpoint(time)
      }
  • 相关阅读:
    【Android】6.3 ProgressDialog
    【Android】6.2 AlertDialog(警告对话框)
    【Android】6.1 Toast(信息提示框)
    【Android】6.0 第6章 对话框--本章示例主界面
    【Android】5.8 滑动条(SeekBar)
    Storm Trident API
    Storm Trident状态
    Storm Trident详解
    Storm的并行度
    StormUI详解
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11387814.html
Copyright © 2011-2022 走看看