zoukankan      html  css  js  c++  java
  • Flink

    InstanceManager用于管理JobManager申请到的taskManager和slots资源

    /**
     * Simple manager that keeps track of which TaskManager are available and alive.
     */
    public class InstanceManager {
    
        // ------------------------------------------------------------------------
        // Fields
        // ------------------------------------------------------------------------
    
        //分别以InstanceId和ResourceId来索引Instance
        /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
        private final Map<InstanceID, Instance> registeredHostsById;
        /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */
        private final Map<ResourceID, Instance> registeredHostsByResource;
    
        /** Set of hosts that were present once and have died */
        private final Set<ResourceID> deadHosts;
    
        /** Listeners that want to be notified about availability and disappearance of instances */
        private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler
    
        /** The total number of task slots that the system has */
        private int totalNumberOfAliveTaskSlots;

     

    关键的操作,

    registerTaskManager

    /**
     * Registers a task manager. Registration of a task manager makes it available to be used
     * for the job execution.
     *
     * @param taskManagerGateway gateway to the task manager
     * @param taskManagerLocation Location info of the TaskManager
     * @param resources Hardware description of the TaskManager
     * @param numberOfSlots Number of available slots on the TaskManager
     * @return The assigned InstanceID of the registered task manager
     */
    public InstanceID registerTaskManager(
            TaskManagerGateway taskManagerGateway,
            TaskManagerLocation taskManagerLocation,
            HardwareDescription resources,
            int numberOfSlots) {
        
        synchronized (this.lock) {
            InstanceID instanceID = new InstanceID();
    
            Instance host = new Instance( //创建新的instance
                taskManagerGateway,
                taskManagerLocation,
                instanceID,
                resources,
                numberOfSlots);
    
            registeredHostsById.put(instanceID, host); //register
            registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
    
            totalNumberOfAliveTaskSlots += numberOfSlots;
    
    
            host.reportHeartBeat();
    
            // notify all listeners (for example the scheduler)
            notifyNewInstance(host);
    
            return instanceID;
        }
    }

    其中,notifyNewInstance

    private void notifyNewInstance(Instance instance) {
        synchronized (this.instanceListeners) {
            for (InstanceListener listener : this.instanceListeners) {
                try {
                    listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable
                }
                catch (Throwable t) {
                    LOG.error("Notification of new instance availability failed.", t);
                }
            }
        }
    }

     

    Instance

    看注释,instance就是一种抽象

    用于描述注册到JobManager,并准备接受work的TaskManager

    /**
     * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
     * registered at a JobManager and ready to receive work.
     */
    public class Instance implements SlotOwner {
    
        /** The instance gateway to communicate with the instance */
        private final TaskManagerGateway taskManagerGateway;
    
        /** The instance connection information for the data transfer. */
        private final TaskManagerLocation location;
    
        /** A description of the resources of the task manager */
        private final HardwareDescription resources;
    
        /** The ID identifying the taskManager. */
        private final InstanceID instanceId;
    
        /** The number of task slots available on the node */
        private final int numberOfSlots;
    
        /** A list of available slot positions */
        private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的
    
        /** Allocated slots on this taskManager */
        private final Set<Slot> allocatedSlots = new HashSet<Slot>();
    
        /** A listener to be notified upon new slot availability */
        private SlotAvailabilityListener slotAvailabilityListener;  //listener用于通知当slot状态发生变化
    
        /** Time when last heat beat has been received from the task manager running on this taskManager. */
        private volatile long lastReceivedHeartBeat = System.currentTimeMillis();

    核心的操作,

    申请slot

    /**
     * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
     * is available at the moment.
     *
     * @param jobID The ID of the job that the slot is allocated for.
     *
     * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
     *         TaskManager instance has no more slots available.
     *
     * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
     *                               slot is allocated. 
     */
    public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
    
        synchronized (instanceLock) {
            Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position
            if (nextSlot == null) {
                return null;
            }
            else {
                SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
                allocatedSlots.add(slot);
                return slot;
            }
        }
    }

     

    归还slot

    /**
     * Returns a slot that has been allocated from this instance. The slot needs have been canceled
     * prior to calling this method.
     * 
     * <p>The method will transition the slot to the "released" state. If the slot is already in state
     * "released", this method will do nothing.</p>
     * 
     * @param slot The slot to return.
     * @return True, if the slot was returned, false if not.
     */
    @Override
    public boolean returnAllocatedSlot(Slot slot) {
    
        if (slot.markReleased()) {
            LOG.debug("Return allocated slot {}.", slot);
            synchronized (instanceLock) {
    
                if (this.allocatedSlots.remove(slot)) {
                    this.availableSlots.add(slot.getSlotNumber());
    
                    if (this.slotAvailabilityListener != null) {
                        this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用
                    }
    
                    return true;
                }
            }
        }
    }
  • 相关阅读:
    Step by step Dynamics CRM 2013安装
    SQL Server 2012 Managed Service Account
    Step by step SQL Server 2012的安装
    Step by step 活动目录中添加一个子域
    Step by step 如何创建一个新森林
    向活动目录中添加一个子域
    活动目录的信任关系
    RAID 概述
    DNS 正向查找与反向查找
    Microsoft Dynamics CRM 2013 and 2011 Update Rollups and Service Packs
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6237155.html
Copyright © 2011-2022 走看看