zoukankan      html  css  js  c++  java
  • spark内核揭秘-06-TaskSceduler启动源码解析初体验

    TaskScheduler实例对象启动源代码如下所示:


    从上面代码可以看出来,taskScheduler的启动是在SparkContext


    找到TaskSchedulerImpl实现类中的start方法实现:

    1、从上代码看到,先启动CoarseGrainedSchedulerBackend,


    从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会先运行preStart()方法


    从上面代码可以看到,初始化了akka客户端监听,还有最重要的是调用了系统的scheduler调度,参数函数是立即执行调度,间隔1000毫秒,运行ReviveOffers方法


    进入makeOffers()方法:


    运行launchTask方法:




    这段代码是spark序列号任务大小超过akkaFrameSize - AkkaUtils.reservedSizeBytes大小,那就报错为”Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                    "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                    "spark.akka.frameSize or using broadcast variables for large values.
    “ ,此刻会将该任务终止,并将任务从任务列表中移除,这样推荐使用broadcast广播方式

    否则,将获取执行任务数据,并减少空闲cpu数,发送消息执行 LaunchTask(new SerializableBuffer(serializedTask))方法,即CoarsedGrainedExecutorBackend类的LaunchTask方法:


    上面代码 会运行executor 的launchTask方法:


    TaskRunner就是一个多线程:




    代码太多,我就不截图了,其实实际就是根据机器状况,运行task任务

    2、然后我们回到TaskSchedulerImpl实现类中的start方法


    如果isLocal=false and spark.speculation=true,不是local模式,那就要dispatcher分发任务了,默认是100毫秒后立即启动,并间隔100毫秒循环运行,


    CoarseGrainedSchedulerBackend的reviveOffers:



    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    centos crash debug
    go get Unknown SSL protocol error in connection to gopkg.in
    Tensorflow serving with Kubernetes
    Spring 集成 Swagger UI
    Docker Registry V2 Garbage Collection
    Docker Registry V2 with Nginx
    Zabbix磁盘性能监控
    Zabbix CPU utilization监控参数
    Windows挂载Gluster复制卷
    Redis持久化存储(三)
  • 原文地址:https://www.cnblogs.com/stark-summer/p/4829816.html
Copyright © 2011-2022 走看看