zoukankan      html  css  js  c++  java
  • Hadoop源码分析27 JobTracker空载处理心跳

    JobTracker无任务时处理心跳流程

     

    HeartBeat格式{restarted=trueinitialContact=trueacceptNewTasks=trueresponseId=-1

    status=TaskTrackerStatus {failures=0trackerName="tracker_server3:localhost.localdomain/127.0.0.1:57441"(id=2249)      taskReports=[], maxReduceTasks=2, lastSeen=0, httpPort=50060, host="server3"(id=2243),

    healthStatus=TaskTrackerStatus$TaskTrackerHealthStatus { lastReported=0,isNodeHealthy=true,              healthReport="" (id=2261)}      

    resStatus=TaskTrackerStatus$ResourceStatus {availablePhysicalMemory=601034752, availableSpace=32463671296,availableVirtualMemory=2705653760, cpuFrequency=2195079,cpuUsage=-1.0,cumulativeCpuTime=1227000                        , mapSlotMemorySizeOnTT=-1, numProcessors=1,reduceSlotMemorySizeOnTT=-1, totalPhysicalMemory=1044144128,totalVirtualMemory=3158065152}

    }

    }                                                                                                                     

    判断是否应该接受:(inHostsList(status)&& !inExcludedHostsList(status))

    主要调用方法:

    public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId)


    private synchronized boolean process Heartbeat(TaskTrackerStatus trackerStatus,

    boolean initialContact,long timeStamp) throws UnknownHostException

     

    private boolean updateTaskTrackerStatus(String trackerName, TaskTrackerStatus status)

     
    private void addNewTracker(TaskTracker taskTracker) throws UnknownHostException
     
    public Node resolveAndAddToTopology(String name) throws UnknownHostException
     
    private Node addHostToNodeMapping(String host, StringnetworkLoc)
     
    void  updateTaskStatuses(TaskTrackerStatus status)
     

    private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus, long timeStamp)

     

    synchronized ListTask> getSetupAndCleanupTasks(TaskTrackerStatus taskTracker) throws IOException

     
    public  synchronized  ClusterStatus  getClusterStatus(boolean detailed)
     

    private synchronized ListTaskTrackerAction>  getTasksToKill(String taskTracker)

     

    private ListTaskTrackerAction> getJobsForCleanup(String taskTracker)

     

    private synchronized ListTaskTrackerAction  getTasksToSave(TaskTrackerStatust ts )

     

    public int getNextHeartbeatInterval()

     

    private void removeMarkedTasks(String taskTracker)

     

    void org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.markTrackerHealthy(StringhostName)
     
    boolean org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.isBlacklisted(StringhostName)
     
    void org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.setNodeHealthStatus(StringhostName, boolean isHealthy, Stringreason, long timeStamp)
     
    ListString org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(ListString names)
     
    void org.apache.hadoop.net.CachedDNSToSwitchMapping.cacheResolvedHosts(ListString uncachedHosts, ListString resolvedHosts)
    ListString org.apache.hadoop.net.CachedDNSToSwitchMapping.getCachedHosts(ListString names)
     
    ListTask org.apache.hadoop.mapred.JobQueueTaskScheduler.assignTasks(TaskTrackertaskTracker) throws IOException
     

     

    1Server3TaskTracker首次启动后HeartBeat

    FaultyTrackersInfo.potentiallyFaultyTrackers移除看是否应该从Graylist Blacklist移除

     

    trackerToHeartbeatResponseMap拿出上一次的HeartbeatResponse,为null

     

    taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus,null

     

    更新JobTracker的成员totalMaps=0totalReduces=0occupiedMapSlots=0occupiedReduceSlots=0

     

    FaultyTrackersInfo.potentiallyFaultyTrackers查看是否在黑名单中,更新totalMapTaskCapacity=2totalReduceTaskCapacity=2

     

    加入taskTrackers此时内容为:

    {tracker_server3:localhost.localdomain/127.0.0.1:43336=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@4fd86469}

     

    加入uniqueHostsMap此时其内容为:

    {server3=1}

     

    加入trackerExpiryQueue,此时其内容为:

    [org.apache.hadoop.mapred.TaskTrackerStatus@4e048dc6]

     

    加入dnsToSwitchMapping.cache,内容为

    {10.1.1.103=/default-rack}

     

    加入clusterMap,内容为:

    Number of racks: 1

    Expected number of leaves:1

    /default-rack/server3

     

    加入hostnameToNodeMap,内容为

    {server3=/default-rack/server3}

     

    加入nodesAtMaxLevel,内容为:

    [/default-rack]

     

    加入hostnameToTaskTracker,内容为

    {server3=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@4fd86469]}

     

    检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

     

    responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask,生成TaskTrackerAction(此时为空)

    取得nextInterval

    生成HeartbeatResponse,内容:

    {actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=0}

     

    加入trackerToHeartbeatResponseMap,内容为

    {tracker_server3:localhost.localdomain/127.0.0.1:43336=org.apache.hadoop.mapred.HeartbeatResponse@25a78661}

    发送HeartbeatResponse给客户端

     

    2Server2TaskTracker首次启动后HeartBeat

    同样先从potentiallyFaultyTrackers移除看是否应该从Graylist Blacklist移除

     

    FaultyTrackersInfo.trackerToHeartbeatResponseMap拿出上一次的HeartbeatResponse,为null

     

    taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus,null

     

    更新JobTracker的成员totalMaps=0totalReduces=0occupiedMapSlots=0occupiedReduceSlots=0totalMapTaskCapacity=4totalReduceTaskCapacity=4

     

    加入taskTrackers此时内容为:

    {tracker_server2:localhost.localdomain/127.0.0.1:34381=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@412eb15f,tracker_server3:localhost.localdomain/127.0.0.1:45605=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@2634d0e2}

     

    加入uniqueHostsMap此时其内容为:

    {server2=1, server3=1}

     

    加入trackerExpiryQueue,此时其内容为:

    [org.apache.hadoop.mapred.TaskTrackerStatus@4444ad54,org.apache.hadoop.mapred.TaskTrackerStatus@2ea31991]

     

    加入dnsToSwitchMapping.cache,内容为

    {10.1.1.102=/default-rack,10.1.1.103=/default-rack}

     

    加入clusterMap,内容为:

    Number of racks: 1

    Expected number of leaves:2

    /default-rack/server3

    /default-rack/server2

     

     

    加入hostnameToNodeMap,内容为

    {server2=/default-rack/server2,server3=/default-rack/server3}

     

    加入nodesAtMaxLevel,内容为:

    [/default-rack]

     

    加入hostnameToTaskTracker,内容为

    {server2=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@412eb15f],server3=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@2634d0e2]}

     

    检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

     

    responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask,生成TaskTrackerAction(此时为空)

    取得nextInterval

    生成HeartbeatResponse,内容:

    {actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=0}

     

    加入trackerToHeartbeatResponseMap,内容为

    {tracker_server2:localhost.localdomain/127.0.0.1:34381=org.apache.hadoop.mapred.HeartbeatResponse@2f4dd8ae,tracker_server3:localhost.localdomain/127.0.0.1:45605=org.apache.hadoop.mapred.HeartbeatResponse@16bd1f19}

    发送HeartbeatResponse给客户端

     

    3. Server3再次HeartBeat

     

    potentiallyFaultyTrackers移除看是否应该从Blacklist移除

     

    FaultyTrackersInfo.trackerToHeartbeatResponseMap取得上次HeartbeatResponse,为

    org.apache.hadoop.mapred.HeartbeatResponse@16bd1f19

     

    判断上一次的ResponseId是否与这次接收的ResponseId相同。

     

    更新JobTracker的成员totalMapstotalReducesoccupiedMapSlotsoccupiedReduceSlotstotalMapTaskCapacitytotalReduceTaskCapacity先从taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus还原更新,然后用这一次的TaskTrackerStatus更新,其中要FaultyTrackersInfo.potentiallyFaultyTrackers查看是否在黑名单中。

     

    更新taskTrackers此时内容为: {tracker_server2:localhost.localdomain/127.0.0.1:52688=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@1cdc471a,tracker_server3:localhost.localdomain/127.0.0.1:40286=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@665755f5}

     

    检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

     

    responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask生成TaskTrackerAction(此时为空)

    取得nextInterval

    生成HeartbeatResponse,内容:

    {actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=1}

     

    更新trackerToHeartbeatResponseMap,内容为

    {tracker_server2:localhost.localdomain/127.0.0.1:52688=org.apache.hadoop.mapred.HeartbeatResponse@1500df0b,tracker_server3:localhost.localdomain/127.0.0.1:40286=org.apache.hadoop.mapred.HeartbeatResponse@6c3355f2}

    发送HeartbeatResponse给客户端

     

    4. ExpireTrackers移除过期

     

    trackerExpiryQueue取出一个 TaskTrackerStatus,根据LastSeen判断是否清除或更新,

    taskTrackers取出TaskTracker.TaskTrackerStatus继续判断LastSeen

     

    若不需清除有则更新trackerExpiryQueue

     

    若需清除从trackerExpiryQueue清除,trackerToJobsToCleanuptrackerToTasksToCleanuprecoveredTrackerstrackerToTaskMap清除

     

    还原更新totalMapstotalReducesoccupiedMapSlotsoccupiedReduceSlotstotalMapTaskCapacitytotalReduceTaskCapacity

     

    taskTrackersuniqueHostsMaphostnameToTaskTracker移除

  • 相关阅读:
    Unix Vi 命令详解
    硬盘安装 solaris
    Oracle 10g RAC OCR 和 VotingDisk 的备份与恢复
    Unix vmstat 命令
    Unix Vi 命令详解
    How do I rename a data file
    Oracle 购买价格 和 服务费 计算方式
    Solaris 更改系统语言
    硬盘安装 solaris
    How do I rename a data file
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276492.html
Copyright © 2011-2022 走看看