SparkEnv中创建MapOutputTracker
def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) //创建MapOutputTracker 区分Driver, Executor val mapOutputTracker = if (isDriver) { //Driver需要BroadcastManager new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else { new MapOutputTrackerWorker(conf) } // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
MapOutputTracker是用于跟踪map阶段任务的输出状态, 此状态便于reduce阶段获取地址及中间输出结果,每个map或reduce都有唯一的标识(mapId, reduceId),
MapOutputTracker是基于Master/Slave的架构,Master(Driver)负责存储当前Application上所有Shuffle的Map输出元数据信息,而Slave(Executor)可以通过rpc对Master上的Map输出状态信息进行查询。
区分Driver, executor的MapOutputTracker
//创建MapOutputTracker 区分Driver, Executor val mapOutputTracker = if (isDriver) { //Driver需要BroadcastManager new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else { new MapOutputTrackerWorker(conf) }
两者都实现了MapOutputTracker
/** * Class that keeps track of the location of the map output of * a stage. This is abstract because different versions of MapOutputTracker * (driver and executor) use different HashMap to store its metadata. */ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging