zoukankan      html  css  js  c++  java
  • Flink

    Job资源分配的过程,

    在submitJob中,会生成ExecutionGraph
    最终调用到,

    executionGraph.scheduleForExecution(scheduler)

    接着,ExecutionGraph

    public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
    // simply take the vertices without inputs.
    for (ExecutionJobVertex ejv : this.tasks.values()) {
    if (ejv.getJobVertex().isInputVertex()) {
    ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    }
    }

    然后,ExecutionJobVertex

    public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

    ExecutionVertex[] vertices = this.taskVertices;

    // kick off the tasks
    for (ExecutionVertex ev : vertices) {
    ev.scheduleForExecution(slotProvider, queued);
    }
    }
    再,ExecutionVertex
    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
    return this.currentExecution.scheduleForExecution(slotProvider, queued);
    }

    最终,Execution

    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
    
        final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
        final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
    
        if (transitionState(CREATED, SCHEDULED)) {
    
            ScheduledUnit toSchedule = locationConstraint == null ?
                new ScheduledUnit(this, sharingGroup) :
                new ScheduledUnit(this, sharingGroup, locationConstraint);
    
            // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
            //     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
            final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源
    
            // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
            // that we directly deploy the tasks if the slot allocation future is completed. This is
            // necessary for immediate deployment.
            final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
                @Override
                public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
                    if (simpleSlot != null) {
                        try {
                            deployToSlot(simpleSlot); //如果申请到,去部署
                        } catch (Throwable t) {
                            try {
                                simpleSlot.releaseSlot();
                            } finally {
                                markFailed(t);
                            }
                        }
                    }
                    else {
                        markFailed(throwable);
                    }
                    return null;
                }
            });
            
            return true;
        }

     

    调用到,slotProvider.allocateSlot, slotProvider即Scheduler

    @Override
    public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
            throws NoResourceAvailableException {
    
        final Object ret = scheduleTask(task, allowQueued);
        if (ret instanceof SimpleSlot) {
            return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束
        }
        else if (ret instanceof Future) {
            return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future
        }
        else {
            throw new RuntimeException();
        }
    }

     

    scheduleTask

       /**
         * Returns either a {@link SimpleSlot}, or a {@link Future}.
         */
        private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
    
            final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
            
            final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
            final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
                                        preferredLocations != null && preferredLocations.iterator().hasNext(); //如果preferredLocations不为空,且vertex仅能local schedule
        
            synchronized (globalLock) { //全局锁
                
                SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
                
                if (sharingUnit != null) { //如果有SlotSharingGroup
    
                    // 1)  === If the task has a slot sharing group, schedule with shared slots ===
                    
                    final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
                    final CoLocationConstraint constraint = task.getLocationConstraint();
                    
                    // get a slot from the group, if the group has one for us (and can fulfill the constraint)
                    final SimpleSlot slotFromGroup;
                    if (constraint == null) {
                        slotFromGroup = assignment.getSlotForTask(vertex); //通过SlotSharingGroupAssignment来分配slot
                    }
                    else {
                        slotFromGroup = assignment.getSlotForTask(vertex, constraint);
                    }
    
                    SimpleSlot newSlot = null;
                    SimpleSlot toUse = null;
    
                    // the following needs to make sure any allocated slot is released in case of an error
                    try {
                        
                        // check whether the slot from the group is already what we want.
                        // any slot that is local, or where the assignment was unconstrained is good!
                        if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果找到local slot
                            updateLocalityCounters(slotFromGroup, vertex);
                            return slotFromGroup; //已经找到合适的slot,返回
                        }
                        
                        
                        // the group did not have a local slot for us. see if we can one (or a better one)
                        // our location preference is either determined by the location constraint, or by the
                        // vertex's preferred locations
                        final Iterable<TaskManagerLocation> locations;
                        final boolean localOnly;
                        if (constraint != null && constraint.isAssigned()) { //如果有constraint
                            locations = Collections.singleton(constraint.getLocation());
                            localOnly = true;
                        }
                        else {
                            locations = vertex.getPreferredLocationsBasedOnInputs(); //否则,以输入节点所分配的slot的location信息,作为Preferred
                            localOnly = forceExternalLocation;
                        }
                        // the group did not have a local slot for us. see if we can one (or a better one)
                        newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图为SharingGroup申请一个新的slot
    
                        
                        if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {//如果newSlot是local的,那么就是使用newSlot
                            // if there is no slot from the group, or the new slot is local,
                            // then we use the new slot
                            if (slotFromGroup != null) {
                                slotFromGroup.releaseSlot();
                            }
                            toUse = newSlot; //使用新new的slot
                        }
                        else {
                            // both are available and usable. neither is local. in that case, we may
                            // as well use the slot from the sharing group, to minimize the number of
                            // instances that the job occupies
                            newSlot.releaseSlot();
                            toUse = slotFromGroup;
                        }
    
                        // if this is the first slot for the co-location constraint, we lock
                        // the location, because we are going to use that slot
                        if (constraint != null && !constraint.isAssigned()) {
                            constraint.lockLocation();
                        }
                        
                        updateLocalityCounters(toUse, vertex);
                    }
                    
                    return toUse; //返回申请的slot
                }
                else { //如果不是共享slot,比较简单
                    
                    // 2) === schedule without hints and sharing ===
                    
                    SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot
                    if (slot != null) {
                        updateLocalityCounters(slot, vertex);
                        return slot; //申请到了就返回slot
                    }
                    else {
                        // no resource available now, so queue the request
                        if (queueIfNoResource) { //如果可以queue
                            CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
                            this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请
                            return future;
                        }
                    }
                }
            }
        }

     

    如果有SlotSharingGroup

    首先试图从SlotSharingGroupAssignment中分配slot

    slotFromGroup = assignment.getSlotForTask(vertex), 参考,Flink – SlotSharingGroup

    如果没有发现local的slot,试图为该vertex创建一个新的slot,

    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图为SharingGroup申请一个新的slot

        protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
                                                        Iterable<TaskManagerLocation> requestedLocations,
                                                        SlotSharingGroupAssignment groupAssignment,
                                                        CoLocationConstraint constraint,
                                                        boolean localOnly)
        {
            // we need potentially to loop multiple times, because there may be false positives
            // in the set-with-available-instances
            while (true) {
                Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //根据locations信息找到local的instance
                
                if (instanceLocalityPair == null) { //如果没有可用的instance,返回null
                    // nothing is available
                    return null;
                }
    
                final Instance instanceToUse = instanceLocalityPair.getLeft();
                final Locality locality = instanceLocalityPair.getRight();
    
                try {
                    JobVertexID groupID = vertex.getJobvertexId();
                    
                    // allocate a shared slot from the instance
                    SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment); //从instance申请一个SharedSlot
    
                    // if the instance has further available slots, re-add it to the set of available resources.
                    if (instanceToUse.hasResourcesAvailable()) { //如果这个instance还有多余的资源,再加入instancesWithAvailableResources,下次还能继续用来分配
                        this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
                    }
    
                    if (sharedSlot != null) {
                        // add the shared slot to the assignment group and allocate a sub-slot
                        SimpleSlot slot = constraint == null ?
                                groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) : //把分配的SharedSlot加到SlotSharingGroup的SlotSharingGroupAssignment中,并返回SharedSlot所持有的slot
                                groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint);
    
                        if (slot != null) {
                            return slot;
                        }
                        else {
                            // could not add and allocate the sub-slot, so release shared slot
                            sharedSlot.releaseSlot();
                        }
                    }
                }
                catch (InstanceDiedException e) {
                    // the instance died it has not yet been propagated to this scheduler
                    // remove the instance from the set of available instances
                    removeInstance(instanceToUse);
                }
    
                // if we failed to get a slot, fall through the loop
            }
        }

    findInstance

        private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
            
            // drain the queue of newly available instances
            while (this.newlyAvailableInstances.size() > 0) { //BlockingQueue<Instance> newlyAvailableInstances
                Instance queuedInstance = this.newlyAvailableInstances.poll();
                if (queuedInstance != null) {
                    this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance); // Map<ResourceID, Instance> instancesWithAvailableResources
                }
            }
            
            // if nothing is available at all, return null
            if (this.instancesWithAvailableResources.isEmpty()) {
                return null;
            }
    
            Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();
    
            if (locations != null && locations.hasNext()) { //如果有prefered locations,优先找相对应的Instance
                // we have a locality preference
    
                while (locations.hasNext()) {
                    TaskManagerLocation location = locations.next();
                    if (location != null) {
                        Instance instance = instancesWithAvailableResources.remove(location.getResourceID()); //找到对应于perfer location的Instance
                        if (instance != null) {
                            return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);
                        }
                    }
                }
                
                // no local instance available
                if (localOnly) { //如果localOnly,而前面又没有找到local的,所以只能返回null
                    return null;
                }
                else {
                    // take the first instance from the instances with resources
                    Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
                    Instance instanceToUse = instances.next();
                    instances.remove();
    
                    return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL); //由于前面没有找到local的,所以返回第一个instance,locality为non_local
                }
            }
            else {
                // no location preference, so use some instance
                Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
                Instance instanceToUse = instances.next();
                instances.remove();
    
                return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED); //没有约束,也是取第一个instance,locality为UNCONSTRAINED
            }
        }

    Instance.allocateSharedSlot

        public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment)
                throws InstanceDiedException
        {
            synchronized (instanceLock) {
                if (isDead) {
                    throw new InstanceDiedException(this);
                }
    
                Integer nextSlot = availableSlots.poll(); //Queue<Integer> availableSlots;
                if (nextSlot == null) {
                    return null;
                }
                else {
                    SharedSlot slot = new SharedSlot(
                            jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
                    allocatedSlots.add(slot); //Set<Slot> allocatedSlots
                    return slot;
                }
            }
        }

    如果新分配的slot是local的,就用newSlot;如果不是并且当前SlotSharingGroup中是有non-local的slot,就用现成的slot,没必要使用新的slot,这时需要把newSlot释放掉

    如果没有SlotSharingGroup

    简单的调用

    SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);

         protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
                                                Iterable<TaskManagerLocation> requestedLocations,
                                                boolean localOnly) {
            // we need potentially to loop multiple times, because there may be false positives
            // in the set-with-available-instances
            while (true) {
                Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到一个合适的instance
    
                if (instanceLocalityPair == null){
                    return null;
                }
    
                Instance instanceToUse = instanceLocalityPair.getLeft();
                Locality locality = instanceLocalityPair.getRight();
    
                try {
                    SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //分配一个simpleSlot
                    
                    // if the instance has further available slots, re-add it to the set of available resources.
                    if (instanceToUse.hasResourcesAvailable()) {
                        this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
                    }
                    
                    if (slot != null) {
                        slot.setLocality(locality);
                        return slot;
                    }
                }
                catch (InstanceDiedException e) {
                    // the instance died it has not yet been propagated to this scheduler
                    // remove the instance from the set of available instances
                    removeInstance(instanceToUse);
                }
                
                // if we failed to get a slot, fall through the loop
            }
        }

    逻辑和分配SharedSlot基本相同,只是会调用,

        public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
            if (jobID == null) {
                throw new IllegalArgumentException();
            }
    
            synchronized (instanceLock) {
                if (isDead) {
                    throw new InstanceDiedException(this);
                }
    
                Integer nextSlot = availableSlots.poll();
                if (nextSlot == null) {
                    return null;
                }
                else {
                    SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
                    allocatedSlots.add(slot);
                    return slot;
                }
            }
        }

     

    Instance

    Scheduler中的Instance怎么来的?

    Scheduler实现InstanceListener接口的

    newInstanceAvailable
        @Override
        public void newInstanceAvailable(Instance instance) {
            
            // synchronize globally for instance changes
            synchronized (this.globalLock) {
                
                // check we do not already use this instance
                if (!this.allInstances.add(instance)) { //看看是否已经有了这个instance
                    throw new IllegalArgumentException("The instance is already contained.");
                }
                
                try {
                    // make sure we get notifications about slots becoming available
                    instance.setSlotAvailabilityListener(this); //加上SlotAvailabilityListener,当slot ready的时候,可以被通知
                    
                    // store the instance in the by-host-lookup
                    String instanceHostName = instance.getTaskManagerLocation().getHostname(); 
                    Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); // HashMap<String, Set<Instance>> allInstancesByHost
                    if (instanceSet == null) {
                        instanceSet = new HashSet<Instance>();
                        allInstancesByHost.put(instanceHostName, instanceSet);
                    }
                    instanceSet.add(instance);
    
                    // add it to the available resources and let potential waiters know
                    this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); // Map<ResourceID, Instance> instancesWithAvailableResources
    
                    // add all slots as available
                    for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) { //多次触发newSlotAvailable
                        newSlotAvailable(instance);
                    }
                }
                catch (Throwable t) {
                    LOG.error("Scheduler could not add new instance " + instance, t);
                    removeInstance(instance);
                }
            }
        }

     

    newInstanceAvailable,何时被调用,

     

    JobManager

          case msg @ RegisterTaskManager(
              resourceId,
              connectionInfo,
              hardwareInformation,
              numberOfSlots) =>
              
              
              val instanceID = instanceManager.registerTaskManager(
                taskManagerGateway,
                connectionInfo,
                hardwareInformation,
                numberOfSlots)

     

    InstanceManager

        public InstanceID registerTaskManager(
                TaskManagerGateway taskManagerGateway,
                TaskManagerLocation taskManagerLocation,
                HardwareDescription resources,
                int numberOfSlots) {
            
            synchronized (this.lock) {
    
                InstanceID instanceID = new InstanceID();
    
                Instance host = new Instance(
                    taskManagerGateway,
                    taskManagerLocation,
                    instanceID,
                    resources,
                    numberOfSlots);
    
                // notify all listeners (for example the scheduler)
                notifyNewInstance(host);
    
                return instanceID;
            }
        }

     

        private void notifyNewInstance(Instance instance) {
            synchronized (this.instanceListeners) {
                for (InstanceListener listener : this.instanceListeners) {
                    try {
                        listener.newInstanceAvailable(instance);
                    }
                    catch (Throwable t) {
                        LOG.error("Notification of new instance availability failed.", t);
                    }
                }
            }
        }

     

    Scheduler还是实现SlotAvailabilityListener

    会调用newSlotAvailable

    逻辑只是check是否有待分配的task,当有新的slot ready的时候,把queuedTask的future complete掉

    @Override
        public void newSlotAvailable(final Instance instance) {
            
            // WARNING: The asynchrony here is necessary, because  we cannot guarantee the order
            // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:
            // 
            // -> The scheduler needs to grab them (1) global scheduler lock
            //                                     (2) slot/instance lock
            // -> The slot releasing grabs (1) slot/instance (for releasing) and
            //                             (2) scheduler (to check whether to take a new task item
            // 
            // that leads with a high probability to deadlocks, when scheduling fast
    
            this.newlyAvailableInstances.add(instance);
    
            Futures.future(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    handleNewSlot();
                    return null;
                }
            }, executionContext);
        }
        
        private void handleNewSlot() {
            
            synchronized (globalLock) {
                Instance instance = this.newlyAvailableInstances.poll();
                if (instance == null || !instance.hasResourcesAvailable()) {
                    // someone else took it
                    return;
                }
                
                QueuedTask queued = taskQueue.peek(); //如果有待分配的task
                
                // the slot was properly released, we can allocate a new one from that instance
                
                if (queued != null) {
                    ScheduledUnit task = queued.getTask();
                    ExecutionVertex vertex = task.getTaskToExecute().getVertex();
                    
                    try {
                        SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId()); //从instance分配一个simpleSlot
                        if (newSlot != null) {
                            
                            // success, remove from the task queue and notify the future
                            taskQueue.poll();
                            if (queued.getFuture() != null) {
                                try {
                                    queued.getFuture().complete(newSlot); //complete该task的future,有slot了,你不用继续等了
                                }
                                catch (Throwable t) {
                                    LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
                                    task.getTaskToExecute().fail(t);
                                }
                            }
                        }
                    }
                    catch (InstanceDiedException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Instance " + instance + " was marked dead asynchronously.");
                        }
                        
                        removeInstance(instance);
                    }
                }
                else { //如果没有排队的task,直接把instance放到instancesWithAvailableResources就好
                    this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                }
            }
        }

     

    newSlotAvailable除了当new instance注册时被调用外,还会在Instance.returnAllocatedSlot,即有人释放AllocatedSlot时,会被调用

  • 相关阅读:
    【队列】队列的分类和实现
    【JSP】EL表达式语言
    【JSP】JSP的介绍和基本原理
    【JSP】JSP Action动作标签
    【Servlet】关于RequestDispatcher的原理
    【JSP】JSP指令
    【JSP】JSP中的Java脚本
    【算法】表达式求值--逆波兰算法介绍
    C语言指针详解
    移动架构-MVVM框架
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6704035.html
Copyright © 2011-2022 走看看