zoukankan      html  css  js  c++  java
  • Flink – Stream Task执行过程

    Task.run

    if (invokable instanceof StatefulTask) {
    StatefulTask op = (StatefulTask) invokable;
    op.setInitialState(taskStateHandles);
    }
    // run the invokable
    invokable.invoke();
     

    invokable是StreamTask

    StreamTask.invoke

    public final void invoke() throws Exception {
        run();
    }
     

    StreamTask是抽象基类,比如,OneInputStreamTask

    protected void run() throws Exception {
            // cache processor reference on the stack, to make the code more JIT friendly
            final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
    
            while (running && inputProcessor.processInput()) {
                // all the work happens in the "processInput" method
            }
        }

     

    StreamInputProcessor.processInput

    StreamRecord<IN> record = recordOrMark.asRecord();
                            synchronized (lock) {
                                numRecordsIn.inc();
                                streamOperator.setKeyContextElement1(record);
                                streamOperator.processElement(record);
                            }

    可以看到在processElement之前,

    streamOperator.setKeyContextElement1(record);
        @SuppressWarnings({"unchecked", "rawtypes"})
        public void setKeyContextElement1(StreamRecord record) throws Exception {
            setKeyContextElement(record, stateKeySelector1);
        }
    
        private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
            if (selector != null) {
                Object key = selector.getKey(record.getValue()); //通过KeySelector来生成key
                setCurrentKey(key);
            }
        }
    
        @SuppressWarnings({"unchecked", "rawtypes"})
        public void setCurrentKey(Object key) {
            if (keyedStateBackend != null) {
                try {
                    // need to work around type restrictions
                    @SuppressWarnings("unchecked,rawtypes")
                    AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
    
                    rawBackend.setCurrentKey(key); //调用state backend的setCurrentKey
                } catch (Exception e) {
                    throw new RuntimeException("Exception occurred while setting the current key context.", e);
                }
            }
        }
        
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Object getCurrentKey() {
            if (keyedStateBackend != null) {
                return keyedStateBackend.getCurrentKey(); //从state backend取出key
            } else {
                throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream.");
            }
        }

    将key设到state backend中,

    AbstractKeyedStateBackend
        public void setCurrentKey(K newKey) {
            this.currentKey = newKey;
            this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
        }

     

     

    OneInputStreamOperator.processElement

    StreamSink实现OneInputStreamOperator接口

    public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
            implements OneInputStreamOperator<IN, Object> {
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            userFunction.invoke(element.getValue());
        }

     

    最终调用到SinkFunction的invoke

  • 相关阅读:
    tf.nn.batch_normalization()函数解析(最清晰的解释)
    tf.identity()函数解析(最清晰的解释)
    Github Fork项目后如何与源主机代码保持更新同步
    nginx平滑升级、在线添加模块(tengine 动态加载模块)
    nginx平滑升级、在线添加模块(tengine 动态加载模块)
    nginx平滑升级、在线添加模块(tengine 动态加载模块)
    nginx平滑升级、在线添加模块(tengine 动态加载模块)
    使用mysqldump自动备份数据库脚本
    使用mysqldump自动备份数据库脚本
    使用mysqldump自动备份数据库脚本
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7128199.html
Copyright © 2011-2022 走看看