zoukankan      html  css  js  c++  java
  • Flink

    初始化

    Task

    List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
    
    // Consumed intermediate result partitions
    this.inputGates = new SingleInputGate[consumedPartitions.size()];
    this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
    
    for (int i = 0; i < this.inputGates.length; i++) {
        SingleInputGate gate = SingleInputGate.create(
                taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, 
                metricGroup.getIOMetricGroup());
    
        this.inputGates[i] = gate;
        inputGatesById.put(gate.getConsumedResultId(), gate);
    }

     

    初始化networkEnvironment

    network.registerTask(this);

     

    NetworkEnvironment

    registerTask
    // Setup the buffer pool for each buffer reader
    final SingleInputGate[] inputGates = task.getAllInputGates();
    
    for (SingleInputGate gate : inputGates) {
        BufferPool bufferPool = null;
    
        try {
            bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
            gate.setBufferPool(bufferPool);
        }

     

    SingleInputGate

    create

    /**
     * Creates an input gate and all of its input channels.
     */
    public static SingleInputGate create(
            String owningTaskName,
            JobID jobId,
            ExecutionAttemptID executionId,
            InputGateDeploymentDescriptor igdd,
            NetworkEnvironment networkEnvironment,
            IOMetricGroup metrics) {
    
        final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
    
        final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
        checkArgument(consumedSubpartitionIndex >= 0);
    
        final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
    
        final SingleInputGate inputGate = new SingleInputGate( //生成SingleInputGate对象
                owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
                icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
    
        // Create the input channels. There is one input channel for each consumed partition.
        final InputChannel[] inputChannels = new InputChannel[icdd.length]; //初始化InputChannel
    
        for (int i = 0; i < inputChannels.length; i++) {
    
            final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
            final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
    
            if (partitionLocation.isLocal()) { //local 
                inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
                        networkEnvironment.getPartitionManager(),
                        networkEnvironment.getTaskEventDispatcher(),
                        networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
                        metrics
                );
            }
            else if (partitionLocation.isRemote()) { //remote
                inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
                        partitionLocation.getConnectionId(),
                        networkEnvironment.getConnectionManager(),
                        networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
                        metrics
                );
            }
            else if (partitionLocation.isUnknown()) {
                inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
                        networkEnvironment.getPartitionManager(),
                        networkEnvironment.getTaskEventDispatcher(),
                        networkEnvironment.getConnectionManager(),
                        networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
                        metrics
                );
            }
            else {
                throw new IllegalStateException("Unexpected partition location.");
            }
    
            inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]); //将inputChannel设置inputGate
        }
    
        return inputGate;
    }

    inputGate的inputChannel,对应于resultPartition的resultSubPartition

     

    ------------------------------------------------------------------------------------------------------

    OneInputStreamTask

    if (numberOfInputs > 0) {
        InputGate[] inputGates = getEnvironment().getAllInputGates();
        inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
                getCheckpointBarrierListener(), 
                configuration.getCheckpointMode(),
                getEnvironment().getIOManager(),
                isSerializingTimestamps());

     

    StreamInputProcessor

    InputGate inputGate = InputGateUtil.createInputGate(inputGates);
    
    if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
        this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
    }
    else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
        this.barrierHandler = new BarrierTracker(inputGate);
    }

    StreamInputProcessor.processInput中

    final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
    if (bufferOrEvent != null) {
        if (bufferOrEvent.isBuffer()) {
            currentChannel = bufferOrEvent.getChannelIndex();
            currentRecordDeserializer = recordDeserializers[currentChannel]; //SpillingAdaptiveSpanningRecordDeserializer
            currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); //将buffer set到SpillingAdaptiveSpanningRecordDeserializer
        }
        
    //后续可以从set到SpillingAdaptiveSpanningRecordDeserializer中反序列化出record
    DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
    if (result.isFullRecord()) {
        StreamElement recordOrWatermark = deserializationDelegate.getInstance();
    final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked()

     

    BarrierBuffer

    public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
        while (true) {
            // process buffered BufferOrEvents before grabbing new ones
            BufferOrEvent next;
            if (currentBuffered == null) { //如果currentBuffered为空,说明没有unblock的buffer数据,直接从inputGate读取
                next = inputGate.getNextBufferOrEvent();
            }

     

    InputGateUtil.createInputGate

    public static InputGate createInputGate(InputGate[] inputGates) {
    
        if (inputGates.length < 2) {
            return inputGates[0];
        } else {
            return new UnionInputGate(inputGates);
        }
    }

     

    UnionInputGate

    /**
     * Input gate wrapper to union the input from multiple input gates.
     *
     * <p> Each input gate has input channels attached from which it reads data. At each input gate, the
     * input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive).
     *
     * <pre>
     * +---+---+      +---+---+---+
     * | 0 | 1 |      | 0 | 1 | 2 |
     * +--------------+--------------+
     * | Input gate 0 | Input gate 1 |
     * +--------------+--------------+
     * </pre>
     *
     * The union input gate maps these IDs from 0 to the *total* number of input channels across all
     * unioned input gates, e.g. the channels of input gate 0 keep their original indexes and the
     * channel indexes of input gate 1 are set off by 2 to 2--4.
     *
     * <pre>
     * +---+---++---+---+---+
     * | 0 | 1 || 2 | 3 | 4 |
     * +--------------------+
     * | Union input gate   |
     * +--------------------+
     * </pre>
     *
     * It is possible to recursively union union input gates.
     */
    public class UnionInputGate implements InputGate {
    
        /** The input gates to union. */
        private final InputGate[] inputGates;
    
        private final Set<InputGate> inputGatesWithRemainingData; //没有结束的inputGate
    
        /** Data availability listener across all unioned input gates. */
        private final InputGateListener inputGateListener;
    
        /** The total number of input channels across all unioned input gates. */
        private final int totalNumberOfInputChannels; //所有的inputGates的所有channels的数目
    
        /**
         * A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0
         * (inclusive) to the total number of input channels (exclusive).
         */
        private final Map<InputGate, Integer> inputGateToIndexOffsetMap; //每个inputGate的index的base,比如上面的gate1的base就是2
    
        /** Flag indicating whether partitions have been requested. */
        private boolean requestedPartitionsFlag;
    
        public UnionInputGate(InputGate... inputGates) {
    
            for (InputGate inputGate : inputGates) {
                // The offset to use for buffer or event instances received from this input gate.
                inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); //当前InputChannels的总数就代表该inputGate的base
                inputGatesWithRemainingData.add(inputGate); //加入inputGatesWithRemainingData,表示该inputGate没有结束
    
                currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); //channel数累加
            }
    
            this.totalNumberOfInputChannels = currentNumberOfInputChannels;
    
            this.inputGateListener = new InputGateListener(inputGates, this); //InputGateListener
        }

    将多个实际的inputGates,合成一个抽象的inputGate;这样做的目的是为了后面处理方便,把多个输入对后面透明化掉

     

    那这样在BarrierBuffer,调用inputGate.getNextBufferOrEvent

    其实就是调用,UnionInputGate.getNextBufferOrEvent

    @Override
    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    
        if (inputGatesWithRemainingData.isEmpty()) { //如果所有的inputgate都已经结束
            return null;
        }
    
        // Make sure to request the partitions, if they have not been requested before.
        requestPartitions(); //从相应的resultpartition去request数据
    
        final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom(); //获取一个有数据的inputGate
    
        final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); //真正的取数据,SingleInputGate.getNextBufferOrEvent
    
        if (bufferOrEvent.isEvent()
                && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
                && inputGate.isFinished()) { //如果是结束event,则表示该inputGate已经结束
    
            if (!inputGatesWithRemainingData.remove(inputGate)) { //从队列内删除
                throw new IllegalStateException("Couldn't find input gate in set of remaining " +
                        "input gates.");
            }
        }
    
        // Set the channel index to identify the input channel (across all unioned input gates)
        final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); //取得改inputgate的baseindex
    
        bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); //baseindx + 真实的index = union index
    
        return bufferOrEvent;
    }

     

    InputGateListener

    /**
     * Data availability listener at all unioned input gates.
     *
     * <p> The listener registers itself at each input gate and is notified for *each incoming
     * buffer* at one of the unioned input gates.
     */
    private static class InputGateListener implements EventListener<InputGate> {
    
        private final UnionInputGate unionInputGate;
    
        private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue<InputGate>(); //Cache所有有available buffer的inputGate
    
        @Override
        public void onEvent(InputGate inputGate) { //SingleInputGate.onAvailableBuffer时被触发
            // This method is called from the input channel thread, which can be either the same
            // thread as the consuming task thread or a different one.
            inputGatesWithData.add(inputGate); //将inputGate加入队列,等待读取
    
            for (int i = 0; i < registeredListeners.size(); i++) {
                registeredListeners.get(i).onEvent(unionInputGate);
            }
        }
    
        InputGate getNextInputGateToReadFrom() throws InterruptedException { //从队列头取一个inputGate
            return inputGatesWithData.take();
        }

     

    先看下requestPartitions,如何request resultpartition的?

    public void requestPartitions() throws IOException, InterruptedException {
        if (!requestedPartitionsFlag) {//只需要做一次
            for (InputGate inputGate : inputGates) {
                inputGate.requestPartitions();
            }
    
            requestedPartitionsFlag = true;
        }
    }

     

    SingleInputGate.requestPartitions

    public void requestPartitions() throws IOException, InterruptedException {
        synchronized (requestLock) {
            if (!requestedPartitionsFlag) { //只做一次
    
                for (InputChannel inputChannel : inputChannels.values()) {
                    inputChannel.requestSubpartition(consumedSubpartitionIndex); //调用inputChannel.requestSubpartition
                }
            }
    
            requestedPartitionsFlag = true;
        }
    }

     

    RemoteInputChannel

    @Override
    void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        if (partitionRequestClient == null) {
            // Create a client and request the partition
            partitionRequestClient = connectionManager
                    .createPartitionRequestClient(connectionId);
    
            partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
        }
    }

    PartitionRequestClient,先创建,这个负责和resultSubPartition通信

    requestSubpartition

    public ChannelFuture requestSubpartition(
            final ResultPartitionID partitionId,
            final int subpartitionIndex,
            final RemoteInputChannel inputChannel,
            int delayMs) throws IOException {
    
        partitionRequestHandler.addInputChannel(inputChannel); //将inputChannel加入partitionRequestHandler
    
        final PartitionRequest request = new PartitionRequest( //生成request
                partitionId, subpartitionIndex, inputChannel.getInputChannelId());
    
        if (delayMs == 0) {
            ChannelFuture f = tcpChannel.writeAndFlush(request); //发送request
            f.addListener(listener);
            return f;
        }
        else {
            final ChannelFuture[] f = new ChannelFuture[1];
            tcpChannel.eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    f[0] = tcpChannel.writeAndFlush(request);
                    f[0].addListener(listener);
                }
            }, delayMs, TimeUnit.MILLISECONDS);
    
            return f[0];
        }
    }

     

    PartitionRequestClientHandler

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) { //普遍msg
                decodeMsg(msg);
            }
            else {
                stagedMessages.add(msg);
            }
        }
        catch (Throwable t) {
            notifyAllChannelsOfErrorAndClose(t);
        }
    }

    decodeMsg

    private boolean decodeMsg(Object msg) throws Throwable {
        final Class<?> msgClazz = msg.getClass();
    
        // ---- Buffer --------------------------------------------------------
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
    
            RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); //获取对应的inputChannel
    
            return decodeBufferOrEvent(inputChannel, bufferOrEvent);
        }

     

    decodeBufferOrEvent

    private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
        boolean releaseNettyBuffer = true;
    
        try {
            if (bufferOrEvent.isBuffer()) {
                // ---- Buffer ------------------------------------------------
                BufferProvider bufferProvider = inputChannel.getBufferProvider();
    
                while (true) {
                    Buffer buffer = bufferProvider.requestBuffer(); //从channel的bufferProvider中获取buffer
    
                    if (buffer != null) {
                        buffer.setSize(bufferOrEvent.getSize());
                        bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer()); //将数据写入buffer中
    
                        inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber); //调用inputChannel.onBuffer
    
                        return true;
                    }
                    else if (bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
                        releaseNettyBuffer = false;
    
                        return false;
                    }
                    else if (bufferProvider.isDestroyed()) {
                        return false;
                    }
                }
            }
        }

     

    RemoteInputChannel

    public void onBuffer(Buffer buffer, int sequenceNumber) {
        boolean success = false;
    
        try {
            synchronized (receivedBuffers) {
                if (!isReleased.get()) {
                    if (expectedSequenceNumber == sequenceNumber) {
                        receivedBuffers.add(buffer); //将buffer放入receivedBuffers
                        expectedSequenceNumber++;
    
                        notifyAvailableBuffer();//通知有available buffer
    
                        success = true;
                    }
                }
            }
        }
    }

    notifyAvailableBuffer

    protected void notifyAvailableBuffer() {
        inputGate.onAvailableBuffer(this);
    }

     

    SingleInputGate

    public void onAvailableBuffer(InputChannel channel) {
        inputChannelsWithData.add(channel); //inputChannelsWithData中表示该channel有数据需要读
        EventListener<InputGate> listener = registeredListener;
        if (listener != null) {
            listener.onEvent(this); //通知UnionInputGate,该inputGate有data需要读
        }
    }

     

    ---------------------------------------------------

    SingleInputGate.getNextBufferOrEvent

    @Override
    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    
        requestPartitions();
    
        InputChannel currentChannel = null;
        while (currentChannel == null) { //如果没有有数据的channel,会循环blocking
            currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS); //从inputChannelsWithData poll一个有数据的channel
        }
    
        final Buffer buffer = currentChannel.getNextBuffer(); //读出buffer
    
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
        }
        else {
            final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
    
            if (event.getClass() == EndOfPartitionEvent.class) {
                channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
    
                if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                    hasReceivedAllEndOfPartitionEvents = true;
                }
    
                currentChannel.notifySubpartitionConsumed();
    
                currentChannel.releaseAllResources();
            }
    
            return new BufferOrEvent(event, currentChannel.getChannelIndex());
        }
    }

     

    RemoteInputChannel

    Buffer getNextBuffer() throws IOException {
        synchronized (receivedBuffers) {
            Buffer buffer = receivedBuffers.poll();
    
            numBytesIn.inc(buffer.getSize());
            return buffer;
        }
    }

     

     

     

     

     

     

  • 相关阅读:
    Python学习日记(一)——初识Python
    读《乌合之众》
    用WPF做了几个小游戏
    《HeadFirst设计模式》读后感——对学习设计模式的一些想法
    设计模式C#实现(九)——工厂方法模式和简单工厂
    设计模式C#实现(八)——原型模式
    设计模式C#实现(七)——生成器模式
    设计模式C#实现(六)——单例模式
    《小强升职记》读后感和思维导图
    《魔鬼搭讪学》《魔鬼约会学》读后感
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7641347.html
Copyright © 2011-2022 走看看