zoukankan      html  css  js  c++  java
  • Hadoop源码分析24 JobTracker启动和心跳处理流程

    1.初始化和启动JobTracker

     

    生成一个JobQueueTaskScheduler

     

    taskScheduler=(TaskScheduler)ReflectionUtils.newInstance(schedulerClass,conf);

     

     

    生成一个实现 InterTrackerProtocolJobSubmissionProtocolRefreshUserMappingsProtocolRefreshAuthorizationPolicyProtocolAdminOperationsProtocolServer(JobTracker)

     

    this.interTrackerServer= RPC.getServer(this,addr.getHostName(),   addr.getPort(),handlerCount,  false, conf, secretManager);

     

     

     

    启动HttpServer

     

    infoServer=new HttpServer("job",infoBindAddress, tmpInfoPort,

           tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());

    .....

    infoServer.start();

     

    初始化JobHistory

     

    jobHistoryServer = new JobHistoryServer(conf, aclsManager,infoServer);

    jobHistoryServer.start();

     

    获得本地FS客户端

    this.localFs=FileSystem.getLocal(conf);

     

    获得DFS客户端,即一个ClientProtocol的代理

    fs=getMROwner().doAs(new PrivilegedExceptionActionFileSystem() {

               public FileSystem run() throws IOException{

                 return FileSystem.get(conf);

     }});

    }

     

     

    确认系统目录

     

    发送RPC请求:ClientProtocol.getFileInfo("/tmp/hadoop-admin/mapred/system")

    返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@7b84a726,用来对比ownerpermission

     

    发送RPC请求:ClientProtocol.getListing(getListing(/tmp/hadoop-admin/mapred/system,[])

    返回:[org.apache.hadoop.fs.FileStatus@f7ea7ce5] 用以添加recoveryJob

     

    发送RPC请求ClientProtocol.delete(/tmp/hadoop-admin/mapred/system,true)

    返回:true

     

    发送RPC请求ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/system,rwxr-xr-x)

    返回:true

     

    发送RPC请求ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/system,rwx------)

    返回:true

     

     

    启动 JobTracker

     

    tracker.offerService();

     

     

    发送RPC请求ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/system/jobtracker.info)

    返回:null

     

    发送RPC请求ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/system/jobtracker.info.recover)

    返回:null

     

    发送RPC请求ClientProtocol.create(/tmp/hadoop-admin/mapred/system/jobtracker.info,rwxr-xr-x, DFSClient_168936446, true, true, 3, 67108864)

    返回:输入流

     

    发送RPC请求ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/system/jobtracker.info,rwx------)

    返回:null

     

    发送RPC请求ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/system/jobtracker.info,DFSClient_168936446, null)

    返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1f8c3fc9

     

    发送RPC请求ClientProtocol.complete(/tmp/hadoop-admin/mapred/system/jobtracker.info,DFSClient_168936446)

    返回:true

     

    // start theinter-tracker server once the jt is ready

    this.interTrackerServer.start();

     

    2.JobTracker处理心跳

     

    接收RPC请求:InterTrackerProtocol.getProtocolVersion(org.apache.hadoop.mapred.InterTrackerProtocol,29) from 10.1.1.102:47248

    发送: 29

     

     

    接收RPC请求:InterTrackerProtocol.getProtocolVersion(org.apache.hadoop.mapred.InterTrackerProtocol,29) from 10.1.1.103:39223

    发送: 29

     

    接收RPC请求:InterTrackerProtocol.getBuildVersion()from 10.1.1.102:47248

    发送: 1.0.0 from 1214675 byhortonfo on Thu Dec 15 16:36:35 UTC 2011

     

     

     

    接收RPC请求:InterTrackerProtocol.getBuildVersion()from 10.1.1.103:39224

    发送: 1.0.0 from 1214675 byhortonfo on Thu Dec 15 16:36:35 UTC 2011

     

    接收RPC请求:InterTrackerProtocol.getSystemDir()from 10.1.1.102:47250

    发送: hdfs://server1:9000/tmp/hadoop-admin/mapred/system

     

    接收RPC请求:InterTrackerProtocol.getSystemDir()from 10.1.1.103:39224

    发送: ://server1:9000/tmp/hadoop-admin/mapred/system

     

     

    接收RPC请求:InterTrackerProtocol.heartbeat(org.apache.hadoop.mapred.TaskTrackerStatus@157052cb,true, true, true, -1) from 10.1.1.102:47252

    发送: HeartbeatResponseactions=[], conf=null, heartbeatInterval=3000, recoveredJobs=[],responseId=0      

     

    接收RPC请求:InterTrackerProtocol.heartbeat(org.apache.hadoop.mapred.TaskTrackerStatus@71999260,true, true, true, -1) from 10.1.1.103:39226

    发送: HeartbeatResponseactions=[], conf=null, heartbeatInterval=3000, recoveredJobs=[],responseId=0      

          

     

     心跳处理流程待续

     

     

  • 相关阅读:
    回调函数案例(二)
    回调函数案例(一)
    liteos学习文档liteos.github.io
    HeapAlloc、GlobalAlloc和new等内存分配有什么区别么?
    C语言中字符数组和字符串指针分析
    socket 连接,使得地址马上可以重用
    shutdown 和closesocket
    IP地址转换成Long型数字的算法
    WSAStartup( )
    关于完成端口IOCP异步接收连接函数AcceptEx注意事项
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276496.html
Copyright © 2011-2022 走看看