zoukankan      html  css  js  c++  java
  • spark streaming源码解读之job动态生成和深度思考

    本节主要内如如下:

    一、spark streaming job生成深度思考

    二、spark streaming job生成源码解析

     输入的ds有很多来源Kafka、Socket、Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的额Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理逻辑的过程。

    Spark Streaming二种数据来源:

    1、基于DStream数据源。

    2、基于其他DStream产生的数据源。

    关键性的观点:做大数据的时候不是流失处理,一般会有定时任务,定时任务一般十分钟触发一次、一天触发一次,做大数据的定时任务就是流失处理的感觉,虽然不规范,一切和流处理没有关系的数据都是没有价值的。即使做批处理或数据挖掘其实也是在做数据流处理,只不过是隐形的流处理,所有的数据都会变成流处理。

    所以就有统一的抽象,所有处理都是流处理的方式,所有的处理都将会被纳入流处理。企业大型开发项目都有j2ee后台支撑来提供各种人操作大数据中心。

    Spark streaming程序入口就有batchDuration时间窗口,每隔五秒钟JobGeneration都会产生一个job,这个job是逻辑级别的,所以逻辑级别要有这个job,并且这个job该琢磨做,但环没有做,由底层物理级别的action去做,底层物理级别是基于rdd的依赖关系。Ds的action操作也是逻辑级别的。Ss根据axtion操作产生逻辑级别的job,但是不会运行,就相当线程runnable接口。逻辑级别的暂时没有身材物理级别的,所以可以去调度和优化,假设讲ds的操作翻译成rdd的action,最后一个操作是rdd的action操作,是不是已翻译就立即触发job,纪要完成翻译又不要生成job的话需要

    JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

    下面主要从三个类进行解析:

    1、JobGenerator类:根据batchDuration及内部默认的时间间隔生成Jobs;
    2、JobScheduler:根据batchDuration负责Spark Streaming Job的调度;
    3、ReceiverTracker:负责Driver端元数据的接收和启动executor中的接收数据线程;

    1、JobGenerator类:

    **
     * This class generates jobs from DStreams as well as drives checkpointing and cleaning
     * up DStream metadata.
     */
    private[streaming]
    class JobGenerator(jobScheduler: JobScheduler) extends Logging {

    注释说基于dsg产生数据源,JobGenerator随着时间推移产生很多jobs,ss中除了定时身材的job,患有其他方式身材的job,例如基于各种聚合和状态的操作,状态操作不是基于batchd,他会对很多btchd处理。为了窗口之类的操作会触发JobGenerator,元素局的清理,作业生成的类。

    // eventLoop is created when generator starts.
    // This not being null means the scheduler has been started and not stopped
    private var eventLoop: EventLoop[JobGeneratorEvent] = null //消息循环体定义
    // last batch whose completion,checkpointing and metadata cleanup has been completed
    private var lastProcessedBatch: Time = null
    /** Start generation of jobs */
    def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started
    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter //执行checkpoint检查点
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {//内部匿名类创建
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) //事件处理逻辑
    override protected def onError(e: Throwable): Unit = {
    jobScheduler.reportError("Error in job generator", e)
    }
    }
    eventLoop.start()//启动事件处理线程对队列事件进行处理
    if (ssc.isCheckpointPresent) {
    restart()
    } else {
    startFirstTime()
    }
    }

    /**
     * An event loop to receive events from the caller and process all events in the event thread. It
    * will start an exclusive event thread to process all events.
    *
    * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
    * handle events in time to avoid the potential OOM.
    */
    private[spark] abstract class EventLoop[E](name: String) extends Logging {

    private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()//消息队列数据结构

    private val stopped = new AtomicBoolean(false)//原子变量

    private val eventThread = new Thread(name) {//封装线程对象
    setDaemon(true) //后台为线程

    override def run(): Unit = { //线程执行逻辑
    try {
    while (!stopped.get) {
    val event = eventQueue.take() //从消息队列中逐一获取消息对象
    try {
    onReceive(event) //对获取的消息对象进行业务处理
    } catch {
    case NonFatal(e) => { //处理失败后回调错误逻辑执代码
    try {
    onError(e)
    } catch {
    case NonFatal(e) => logError("Unexpected error in " + name, e)
    }
    }
    }
    }
    } catch {
    case ie: InterruptedException => // exit even if eventQueue is not empty
    case NonFatal(e) => logError("Unexpected error in " + name, e)
    }
    }
    }

    def start(): Unit = { //启动当前线程类
    if (stopped.get) {
    throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start() //启动当前线程类业务run方法的执行
    }
    
    
    /** Processes all events */
    private def processEvent(event: JobGeneratorEvent) {//根据消息对象执行相应的处理业务代码
    logDebug("Got event " + event)
    event match {
    case GenerateJobs(time) => generateJobs(time) //根据时间片生成Jobs
    case ClearMetadata(time) => clearMetadata(time) //时间片内的Jobs执行完毕,清理Driver上的元数据
    case DoCheckpoint(time, clearCheckpointDataLater) =>//时间片内的Jobs执行完毕,清理checkpint数据
       doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
    }
    2、JobSchedule类:
    /**

    * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
      * the jobs and runs them using a thread pool.
    */

    private[streaming]
    class JobScheduler(val ssc: StreamingContext) extends Logging {

    // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
    // https://gist.github.com/AlainODea/1375759b8720a3f9f094
    private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]//在指定的时间片内生成Jobs集合数据结构
    private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
    private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")//启动指定大小的线程池
    private val jobGenerator = new JobGenerator(this)//启动JobGenerator对象
    val clock = jobGenerator.clock //jobGenerator时钟
    val listenerBus = new StreamingListenerBus() //linstenerBus消息总线

    // These two are created only when scheduler starts.
    // eventLoop not being null means the scheduler has been started and not stopped
    var receiverTracker: ReceiverTracker = null //driver端的元数据接收跟踪器
    // A tracker to track all the input stream information as well as processed record number
    var inputInfoTracker: InputInfoTracker = null //输入流信息跟踪器

    private var eventLoop: EventLoop[JobSchedulerEvent] = null //消息循环体对象

    def start(): Unit = synchronized { JobScheudler类启动主方法
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()
    // attach rate controllers of input streams to receive batch completion updates
    for {
    inputDStream <- ssc.graph.getInputStreams //数据流
    rateController <- inputDStream.rateController //数据接收平率控制
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext) //启动消息总线
    receiverTracker = new ReceiverTracker(ssc) //创建接收器对象
    inputInfoTracker = new InputInfoTracker(ssc) //创建数据输入对象
    receiverTracker.start() //启动数据接收器线程
    jobGenerator.start() //启动jobs产生器线程
    logInfo("Started JobScheduler")
    }
    def submitJobSet(jobSet: JobSet) {
      if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
    } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
    }
    }
    private def handleJobStart(job: Job, startTime: Long) {
    val jobSet = jobSets.get(job.time)
    val isFirstJobOfJobSet = !jobSet.hasStarted
    jobSet.handleJobStart(job)
    if (isFirstJobOfJobSet) {
    // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
    // correct "jobSet.processingStartTime".
    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
    }
    job.setStartTime(startTime)
    listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
    logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
    }
    private class JobHandler(job: Job) extends Runnable with Logging {

    import JobScheduler._

    def run() {
    try {
    val formattedTime = UIUtils.formatBatchTime(
    job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
    val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
    val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

    ssc.sc.setJobDescription(
    s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
    ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
    ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

    // We need to assign `eventLoop` to a temp variable. Otherwise, because
    // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
    // it's possible that when `post` is called, `eventLoop` happens to null.
    var _eventLoop = eventLoop
    if (_eventLoop != null) {
    _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
    // Disable checks for existing output directories in jobs launched by the streaming
    // scheduler, since we may need to write output to an existing directory during checkpoint
    // recovery; see SPARK-4835 for more details.
    PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
    job.run()
    }
    _eventLoop = eventLoop
    if (_eventLoop != null) {
    _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
    }
    } else {
    // JobScheduler has been stopped.
    }
    } finally {
    ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
    ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
    }
    }
    }
    }
    
    

    新浪微博:http://weibo.com/ilovepains

    
    

    微信公众号:DT_Spark

    
    

    博客:http://blog.sina.com.cn/ilovepains

    
    

    手机:18610086859

    
    

    QQ:1740415547

    
    

    邮箱:18610086859@vip.126.com

    
    

    Spark发行版笔记6

     
  • 相关阅读:
    Git 学习笔记(W,I,P)
    DirectX API 编程起步 #01 项目设置
    #1004 Let the Balloon Rise
    #1003 Max Sum
    人生的第一个博客(●'◡'●)ノ♥--开博典礼
    2053——switch game
    在Activity间传递数据的四种方法及返回结果
    安卓学习第38课——ProgressDialog
    安卓学习第37课——DatePickerDialog、TimePickerDialog
    安卓学习第36课——PopupWindow
  • 原文地址:https://www.cnblogs.com/sparkbigdata/p/5513339.html
Copyright © 2011-2022 走看看