1. JobInProgress中的容器
1.1maps、reduces、cleanup、setup
TaskInProgress maps[]= new TaskInProgress[0];
TaskInProgress reduces[]= new TaskInProgress[0];
TaskInProgress cleanup[]= new TaskInProgress[0];
TaskInProgress setup[]= new TaskInProgress[0];
在方法initTasks()中被初始化,
maps= new TaskInProgress[numMapTasks];
for(int i=0; i< numMapTasks;++i) {
maps[i]= new TaskInProgress(jobId,jobFile,
splits[i],
jobtracker,conf,this,i, numSlotsPerMap);
}
this.reduces=new TaskInProgress[numReduceTasks];
for(int i = 0;i <numReduceTasks;i++) {
reduces[i]= new TaskInProgress(jobId,jobFile,
numMapTasks,i,
jobtracker,conf,this,numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}
// createcleanup two cleanup tips, one map and one reduce.
cleanup= new TaskInProgress[2];
// cleanupmap tip. This map doesn't use any splits. Just assign anempty
//split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0]= new TaskInProgress(jobId,jobFile,emptySplit,
jobtracker,conf,this,numMapTasks,1);
cleanup[0].setJobCleanupTask();
// cleanupreduce tip.
cleanup[1]= newTaskInProgress(jobId,jobFile,numMapTasks,
numReduceTasks,jobtracker,conf,this,1);
cleanup[1].setJobCleanupTask();
// create twosetup tips, one map and one reduce.
setup=new TaskInProgress[2];
// setup maptip. This map doesn't use any split. Just assign anempty
//split.
setup[0]= new TaskInProgress(jobId,jobFile,emptySplit,
jobtracker,conf,this,numMapTasks+ 1,1);
setup[0].setJobSetupTask();
// setupreduce tip.
setup[1]= new TaskInProgress(jobId,jobFile,numMapTasks,
numReduceTasks+ 1,jobtracker,conf,this,1);
setup[1].setJobSetupTask();
1.2nonRunningMapCache
//NetworkTopology Node to the set of TIPs
Map<Node,List<TaskInProgress>> nonRunningMapCache;
在方法 initTasks()中被初始化:
nonRunningMapCache = createCache(splits, maxLevel);
而createCache为
private Map<Node,List<TaskInProgress>> createCache(
TaskSplitMetaInfo[] splits, int maxLevel)
throws UnknownHostException{
....
}
即根据 TaskSplitMetaInfo创建 Node 和 TaskInProgress的映射。
1.3runningMapCache
// Map ofNetworkTopology Node to set of running TIPs
Map<Node,Set<TaskInProgress>> runningMapCache = newIdentityHashMap<Node,Set<TaskInProgress>>();
在scheduleMap(TaskInProgresstip)中被填充数据:
for(Stringhost: splitLocations) {
Node node = jobtracker.getNode(host);
for(int j = 0;j <maxLevel;++j) {
Set<TaskInProgress>hostMaps= runningMapCache.get(node);
if(hostMaps== null){
// create acache if needed
hostMaps = new LinkedHashSet<TaskInProgress>();
runningMapCache.put(node,hostMaps);
}
hostMaps.add(tip);
node = node.getParent();
}
}
1.4nonLocalMaps
// A list ofnon-local, non-running maps
final List<TaskInProgress> nonLocalMaps = new LinkedList<TaskInProgress>();;
在createCache(TaskSplitMetaInfo[]splits, intmaxLevel)中被填充数据:
for(int i = 0;i <splits.length;i++) {
String[] splitLocations = splits[i].getLocations();
if(splitLocations ==null||splitLocations.length== 0){
nonLocalMaps.add(maps[i]);
continue;
}
......
}
1.5failedMaps
// Set offailed, non-running maps sorted by #failures
final SortedSet<TaskInProgress> failedMaps = new TreeSet<TaskInProgress>(failComparator);;
在 failedTask(TaskInProgresstip, TaskAttemptID taskid, TaskStatusstatus, TaskTracker taskTracker, boolean wasRunning, boolean wasComplete, boolean wasAttemptRunning)中被填充
if (!isComplete){
retireMap(tip);
failMap(tip);
}
其中
private synchronized void failMap(TaskInProgress tip) {
......
failedMaps.add(tip);
}
1.6nonLocalRunningMaps
// A set ofnon-local running maps
Set<TaskInProgress> nonLocalRunningMaps =new LinkedHashSet<TaskInProgress>();
在scheduleMap(TaskInProgresstip) 中被填充
String[] splitLocations = tip.getSplitLocations();
// Add theTIP to the list of non-local running TIPs
if(splitLocations ==null||splitLocations.length== 0){
nonLocalRunningMaps.add(tip);
return;
}
1.6nonRunningReduces
// A list ofnon-running reduce TIPs
Set<TaskInProgress> nonRunningReduces= new TreeSet<TaskInProgress>(failComparator);
在 initTasks()被初始化
this.reduces=new TaskInProgress[numReduceTasks];
for(int i = 0;i <numReduceTasks;i++) {
reduces[i]= new TaskInProgress(jobId,jobFile,
numMapTasks,i,
jobtracker,conf,this,numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}
在 failedTask(TaskInProgresstip, TaskAttemptID taskid, TaskStatusstatus, TaskTracker taskTracker, boolean wasRunning, boolean wasComplete, boolean wasAttemptRunning)被填充
if(!isComplete){
retireReduce(tip);
failReduce(tip);
}
private synchronized void failReduce(TaskInProgress tip) {
。。。。。。
nonRunningReduces.remove(tip);
}
1.7runningReduces
// A set ofrunning reduce TIPs
Set<TaskInProgress> runningReduces = new LinkedHashSet<TaskInProgress>();
调用如下:
protected synchronized void scheduleReduce(TaskInProgress tip) {
。。。
runningReduces.add(tip);
}
private synchronized void retireReduce(TaskInProgress tip) {
......
runningReduces.remove(tip);
}
1.8 mapCleanupTasks、reduceCleanupTasks
// A list ofcleanup tasks for the map task attempts, to belaunched
List<TaskAttemptID> mapCleanupTasks=new LinkedList<TaskAttemptID>();
// A list ofcleanup tasks for the reduce task attempts, to belaunched
List<TaskAttemptID> reduceCleanupTasks =new LinkedList<TaskAttemptID>();
在updateTaskStatus(TaskInProgresstip, TaskStatus status) 中填充
if(state ==TaskStatus.State.FAILED_UNCLEAN||
state == TaskStatus.State.KILLED_UNCLEAN){
tip.incompleteSubTask(taskid, this.status);
if(tip.isMapTask()){
mapCleanupTasks.add(taskid);
} else{
reduceCleanupTasks.add(taskid);
}
jobtracker.removeTaskEntry(taskid);
}
1.9taskCompletionEvents
List<TaskCompletionEvent> taskCompletionEvents=
new ArrayList<TaskCompletionEvent>(numMapTasks+numReduceTasks+10);
在updateTaskStatus(TaskInProgress tip, TaskStatus status)中填充
taskEvent= new TaskCompletionEvent(taskCompletionEventTracker,
taskid,
tip.idWithinJob(),
status.getIsMap()&&
!tip.isJobCleanupTask() &&
!tip.isJobSetupTask(),
taskCompletionStatus,
httpTaskLogLocation
);
this.taskCompletionEvents.add(taskEvent);
taskCompletionEventTracker++;
1.10trackerToFailuresMap
// Map oftrackerHostName -> no. of taskfailures
Map<String,Integer>trackerToFailuresMap= new TreeMap<String,Integer>();
在addTrackerTaskFailure中调用
synchronized void addTrackerTaskFailure(StringtrackerName TaskTracker taskTracker) {
if(flakyTaskTrackers< (clusterSize*CLUSTER_BLACKLIST_PERCENT)){
String trackerHostName =convertTrackerNameToHostName(trackerName);
Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
if(trackerFailures== null){
trackerFailures = 0;
}
trackerToFailuresMap.put(trackerHostName,++trackerFailures);
......
1.11firstTaskLaunchTimes
// First*task launch time
final Map<TaskType,Long>firstTaskLaunchTimes=
new EnumMap<TaskType,Long>(TaskType.class);
在setFirstTaskLaunchTime中调用
void setFirstTaskLaunchTime(TaskInProgresstip) {
TaskType key = tip.getFirstTaskType();
synchronized(firstTaskLaunchTimes){
// Could beoptimized to do only one lookup with a little more code
if(!firstTaskLaunchTimes.containsKey(key)){
firstTaskLaunchTimes.put(key,tip.getExecStartTime());
}
}
}
1.12mapTaskIdToFetchFailuresMap
// Map ofmapTaskId -> no. of fetchfailures
private Map<TaskAttemptID,Integer> mapTaskIdToFetchFailuresMap=
new TreeMap<TaskAttemptID,Integer>();
在fetchFailureNotification(TaskInProgresstip, TaskAttemptID mapTaskId, StringmapTrackerName, TaskAttemptID reduceTaskId, reduceTrackerName) 调用:
IntegerfetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
fetchFailures= (fetchFailures == null)? 1 : (fetchFailures+1);
mapTaskIdToFetchFailuresMap.put(mapTaskId,fetchFailures);
1.13trackersReservedForMaps、trackersReservedForReduces
private Map<TaskTracker,FallowSlotInfo>trackersReservedForMaps=
new HashMap<TaskTracker,FallowSlotInfo>();
private Map<TaskTracker,FallowSlotInfo>trackersReservedForReduces=
new HashMap<TaskTracker,FallowSlotInfo>();
在 reserveTaskTracker(TaskTrackertaskTracker, TaskType type, intnumSlots)中调用
Map<TaskTracker,FallowSlotInfo>map =(type ==TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
map.put(taskTracker,info);