zoukankan      html  css  js  c++  java
  • spark_源码跟踪

    spark源码:
    版本:2.3.4: https://github.com/apache/spark/tree/v2.3.4

    RPC:
    1.解析:远程进程调用

    2.:传输类型: 1.同一进程 2. 不同的进程 同一主机  3.不同的进程、不同的主机(最复杂)
         实例A ---------> 实例B
    3.传输方式:   实例A(有信箱,inbox,用来区别实例中的那个方法)  ->  分发A(用来区别发给哪个实例)  ->  队列A  ->  传输层A   ->  传输层B  ->  分发B  ->  队列B  ->  实例B(有信箱,inbox)

    4. 传输层最典型的是Netty, Netty底层调用了BIO或者NIO

    脚本运行:

    start-all.sh 

    1. start-master.sh
      1. org.apache.spark.deploy.master.Master
      2. object Master
      3. main
      4. 1.rpcEnv.awaitTermination()一直等待,不退出,让rpc一直通信  2.startRpcEnvAndEndpoint()
      5. RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
      6. create
      7. new NettyRpcEnvFactory().create(config)
      8.  看new出的 NettyRpcEnv类   private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)(分发器)  和 nettyEnv.startServer(config.bindAddress, actualPort)(传输层)
        重点
        1. 传输层
          1. @volatile private var server: TransportServer = _ server = transportContext.createServer(bindAddress, port, bootstraps)(传输服务器)
          2. init(hostToBind, portToBind);
          3. 进入netty,java写的,有boss、有work, 2.NettyUtils.createEventLoop 创建线程,需要不断地处理任务,就得创建一个死循环卡主线程不退出, 3.context.initializePipeline(ch, rpcHandler);
          4. createChannelHandler(channel, channelRpcHandler);
          5. new TransportChannelHandler()
          6. channelRead()
          7. requestHandler.handle((RequestMessage) request);
          8. processOneWayMessage((OneWayMessage) request);
          9. rpcHandler.receive(reverseClient, req.body().nioByteBuffer()); 看receive方法实现类NettyRpcEnv
          10. dispatcher.postRemoteMessage(messageToDispatch, callback) 最终扔到了分发器中
          11. postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
          12. 1.data.inbox.post(message) 消息放进数据中的信箱 2. receivers.offer(data) 接收数据

        2. 2.分发器
          1. private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
          2. class Dispatcher 的receive方法
          3. 1.private val receivers = new LinkedBlockingQueue[EndpointData](队列)   2.ThreadPoolExecutor(pool.execute(new MessageLoop))循环处理MessageLoop   3. MessageLoop -> 1.val data = receivers.take()接收信息 2.receivers.offer(PoisonPill) 对垃圾信息投毒 3. data.inbox.process(Dispatcher.this) 装进信箱
    2. start-slaves.sh  
      ->org.apache.spark.deploy.worker.Worker

    结论: 可以看出 实例new出了一个dispatcher 和传输层的dispatcher 最终都会发送信息

    1. dispatcher 是new出来的,开始的时候会注册好服务地址、创建队列、创建死循环线程不断地把postMessage中消息放进队列中,只是初始化(new)的时候都是空的

    2. TransportServer 将数据不断地放进去postMessage

    Master工作原理

      -> netty 对外通信接收消息 -> postMessage -> threadpool  ->  receivers(receive or receiveAndReply)  线程池循环接收消息

    start-slaves.sh  启动的时候需要给定 ip地址 端口号
    ->org.apache.spark.deploy.worker.Worker

     -> main

    -> 1.rpcEnv.awaitTermination()不退出进程 2.startRpcEnvAndEndpoint()
    -> 1. RpcEnv.create(systemName, host, port, conf, securityMgr) 走rpcEnv的思路 2.setupEndpoint()方法内的new Worker()
    -> onStart()
    -> registerWithMaster()
    -> tryRegisterAllMasters()
    -> sendRegisterMessageToMaster(masterEndpoint)
    -> send()

     到达 master的receive()

    -> receive()
    -> RegisterWorker()
    -> val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)获取到worker发送的信息
    -> 1.registerWorker(worker) 2.workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))

    master又发给worker 到达worker的receive

    ->receive()
    -> case msg: RegisterWorkerResponse => handleRegisterResponse(msg)获得注册的响应
    -> self.send(SendHeartbeat)给master发送心跳

      

    ------------------------------------------------------------------------------- 以上是资源层 :  master -> worker  ------------------------------------------------------------------------------------------------------------------

     ----------------------------------    计算层: client-> driver -> executor    ---------------------------------

    spark-submit 跑一个自己写的程序

    -> org.apache.spark.deploy.SparkSubmit
    -> main
    -> case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
    -> doRunMain()
    -> runMain(args, uninitLog)
    -> prepareSubmitEnvironment()获取childMainClass 下一个需要运行类 -》 childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS -》 classOf[ClientApp].getName() -> new ClientEndpoint()
    -> onStart()
    -> 1. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" 2. val command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts) 一个是程序的类,一个是我们的类
    -> asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription)) 我们的数据放到了driver中,driver发送给master
    -> ask
    -> 到达Mater的receiveAndReply()
    -> 最终在worker上启动一个新进程用来跑DriverWrapper
    -> 执行我们写的代码
    -> sparkContext ->textFile ->hadoopRDD -> flatMap -> MappartitionsRDD -> runJob

    sparkContext

    -> sparkEnv

    -> TaskSchedulerImpl -> initialize -> start
    -> standaloneSchedulerVackend 父类 CoarseGrainedSchedulerBackend -> start 

    -> DAGScheduler 

    start 

    stage

    相干计算  -->  需要shuffle 
    不相干计算  --> 组成了一个stage,在一台可以完成 ,pipeline :迭代器嵌套模式  ,窄依赖
    
    stage与stage之间是需要shuffle的, stage划分是根据中间是否shuffle
    
    task任务
    一个stage的任务task的数量是多少,是由stage里最后一个RDD的分区的数量决定的


    ResultStage:最后一个stafe

    ShuffleMapStage: 一个job可以有多个stage, 除了最后一个stage,其他的都是
    ShuffleMapStage


    stage与stage之间是需要shuffle的,数据经过stage是一条一条流过的,前一个stage结束,存储IO文件,下一个stage开始从存储的IO文件开始解析

    问题: spark的task是怎么来的,怎么分给executor的

  • 相关阅读:
    WCF中NetTCp配置
    生产者消费者模式
    MVC 引擎优化
    Wcf
    MongoDB运用
    Sock基础
    WebService
    线程
    委托
    特性
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14405439.html
Copyright © 2011-2022 走看看