zoukankan      html  css  js  c++  java
  • 【源码】Top n 源码

    本文基于 flink 1.12.0

    top n 官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n

    ## sql

    flink top n sql 如下,top n的写法支持如下两种:
    1、rownum > 3 and rownum < 10 ( 如果 只写 rownum < 10 默认是 > 1 , 排名默认是从 1开始)
    2、where rownum < source_table.column (rownum 的值小于源表,输入的第一条数据的一个列值)

    复制代码
    insert into user_log_sink(user_id, item_id, category_id,behavior,sales,ts,sort_col)
    SELECT user_id, item_id, category_id,behavior,sales,sort_col,ts
    FROM (
       SELECT user_id, item_id, category_id,behavior,sales, ts, sort_col,
         ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY ts desc) AS rownum
       FROM user_log)
    WHERE rownum < sort_col
    -- WHERE rownum < 5
    -- 只支持两种 top n:
    --  rownum < 10 or rownum > 3 and rownum < 10
    --  rownum < source_table.column
    -- rownum > 3 是不支持的: Rank end is not specified. Currently rank only support TopN, which means the rank end must be specified.
    -- 不输出 rownum 可以启动无排名优化,仅输出当前数据,对历史数据的排名更新,不再输出
    复制代码

    ## Top N 基类 AbstractTopNFunction

    Top N 根据数据源包含的数据内容(Insert、Update、Delete 消息),支持 UndefinedStrategy、AppendFastStrategy、RetractStrategy、UpdateFastStrategy 四种处理策略(源码参见: RankProcessStrategy) 对应三个处理子类: AppendOnlyTopNFunction、UpdatableTopNFunction、RetractableTopNFunction,这里只讨论 AppendOnlyTopNFunction,类图如下:

    源码解析:

    复制代码
    AbstractTopNFunction(
        long minRetentionTime,
        long maxRetentionTime,
        InternalTypeInfo<RowData> inputRowType,
        GeneratedRecordComparator generatedSortKeyComparator,
        RowDataKeySelector sortKeySelector,
        RankType rankType,
        RankRange rankRange,
        boolean generateUpdateBefore,
        boolean outputRankNumber) {
        // 调用父类的构造函数,设置状态保留最小、最大的时间
        super(minRetentionTime, maxRetentionTime);
        // TODO support RANK and DENSE_RANK
        // 只支持 ROW_NUMBER, 其他直接抱错
        switch (rankType) {
            case ROW_NUMBER:
                break;
            case RANK:
                LOG.error(RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(RANK_UNSUPPORTED_MSG);
            case DENSE_RANK:
                LOG.error(DENSE_RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(DENSE_RANK_UNSUPPORTED_MSG);
            default:
                LOG.error("Streaming tables do not support {}", rankType.name());
                throw new UnsupportedOperationException(
                    "Streaming tables do not support " + rankType.toString());
        }
        if (rankRange instanceof ConstantRankRange) {
            // rank 的 start 和 end 是常量
            ConstantRankRange constantRankRange = (ConstantRankRange) rankRange;
            isConstantRankEnd = true;
            rankStart = constantRankRange.getRankStart();
            rankEnd = constantRankRange.getRankEnd();
            rankEndIndex = -1;
        } else if (rankRange instanceof VariableRankRange) {
            // rank 的 end 是变量,没有 start
            VariableRankRange variableRankRange = (VariableRankRange) rankRange;
            rankEndIndex = variableRankRange.getRankEndIndex();
            isConstantRankEnd = false;
            rankStart = -1;
            rankEnd = -1;
        } else {
            // 其他的都不支持
            LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
            throw new UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
        }
        this.generatedSortKeyComparator = generatedSortKeyComparator;
        this.generateUpdateBefore = generateUpdateBefore;
        this.inputRowType = inputRowType;
        // 是否输出 行号,这个涉及到无行号输出优化
        this.outputRankNumber = outputRankNumber;
        this.sortKeySelector = sortKeySelector;
    }
    
    ```java
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        initCleanupTimeState("RankFunctionCleanupTime");
        outputRow = new JoinedRowData();
    
        if (!isConstantRankEnd) {
            // rank end 值状态,没有 ttl 即 rank end 值,一直不变
            ValueStateDescriptor<Long> rankStateDesc = new ValueStateDescriptor<>(
                "rankEnd",
                Types.LONG);
            rankEndState = getRuntimeContext().getState(rankStateDesc);
        }
        // compile comparator
        sortKeyComparator = generatedSortKeyComparator.newInstance(getRuntimeContext().getUserCodeClassLoader());
        generatedSortKeyComparator = null;
        invalidCounter = getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");
    
        // initialize rankEndFetcher
        if (!isConstantRankEnd) {
            // rank end 不是常量的,即是变量值做为 rank end, 获取 rank end 值的函数
            LogicalType rankEndIdxType = inputRowType.toRowFieldTypes()[rankEndIndex];
            switch (rankEndIdxType.getTypeRoot()) {
                case BIGINT:
                    rankEndFetcher = (RowData row) -> row.getLong(rankEndIndex);
                    break;
                case INTEGER:
                    rankEndFetcher = (RowData row) -> (long) row.getInt(rankEndIndex);
                    break;
                case SMALLINT:
                    rankEndFetcher = (RowData row) -> (long) row.getShort(rankEndIndex);
                    break;
                default:
                    LOG.error(
                        "variable rank index column must be long, short or int type, while input type is {}",
                        rankEndIdxType.getClass().getName());
                    throw new UnsupportedOperationException(
                        "variable rank index column must be long type, while input type is " +
                            rankEndIdxType.getClass().getName());
            }
        }
    }
    
    /**
     * Initialize rank end.
     *
     * @param row input record
     *
     * @return rank end
     *
     * @throws Exception
     */
    protected long initRankEnd(RowData row) throws Exception {
        if (isConstantRankEnd) {
            // 常量 rank end,直接返回
            return rankEnd;
        } else {
            // 变量 rank end 从输入数据中提取
            Long rankEndValue = rankEndState.value();
            long curRankEnd = rankEndFetcher.apply(row);
            // 状态为空时
            if (rankEndValue == null) {
                rankEnd = curRankEnd;
                // 初始化去重 rownum 的最大
                rankEndState.update(rankEnd);
                return rankEnd;
            } else {
                rankEnd = rankEndValue;
                if (rankEnd != curRankEnd) {
                    // increment the invalid counter when the current rank end not equal to previous rank end
                    // metrics + 1
                    invalidCounter.inc();
                }
                return rankEnd;
            }
        }
    }
    
    protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer) {
        // 获取 比较器
        Comparator<RowData> comparator = buffer.getSortKeyComparator();
        // 获取 buffer 的最后一个值
        Map.Entry<RowData, Collection<RowData>> worstEntry = buffer.lastEntry();
        if (worstEntry == null) {
            // 最后一个值为 null,说明没数据,就在范围以内
            // return true if the buffer is empty.
            return true;
        } else {
            // 获取最后一行的 row 的 排序字段值
            RowData worstKey = worstEntry.getKey();
            // 比较 当前值和缓存值
            int compare = comparator.compare(sortKey, worstKey);
            // 如果当前值小于 缓存值,输出,不然要判断是否超过了 rank end,没有超过也可以输出
            if (compare < 0) {
                return true;
            } else {
                return buffer.getCurrentTopNum() < getDefaultTopNSize();
            }
        }
    }
    复制代码

    ## AppendOnlyTopNFunction 只插入的 topN 处理方法

    AppendOnlyTopNFunction 就是个 Process function, 数据处理的方法是 processElement,数据的处理工作,还包含一些初始化的操作(keyed 的process 方法,状态的初始化是不能在 open 函数中做的)

    1、先调用了状态清楚的定时器,如果超过状态保留时间,定时删除对应 key 的状态,使 Top N 重新计数
    2、初始化 Top N 在 heap 内存中的缓存对象 buffer
    3、初始化 Top N 的 rank end 值(非常量做 rank end 的需要从数据中提取)
    4、再是判断数据是否在 Top N 的范围内
    5、如果在范围内,将输入数据放入缓存和对应的状态
    6、再根据是否输出 rownum,具体数量输入数据的排名和数据输出(其中 rownum > 3, 逻辑的处理,是在 父类的数据输出的部分判断, 如: AbstractTopNFunction.collectInsert)

    复制代码
    public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception {
        long currentTime = context.timerService().currentProcessingTime();
        // register state-cleanup timer
        // 注册清理状态的定时器
        registerProcessingCleanupTimer(context, currentTime);
        // 初始化状态 topn heap 状态 (就是 buffer, 异常恢复的时候会将状态中的数据写到 buffer 中)
        initHeapStates();
        // 初始化 rank end 值
        initRankEnd(input);
    
        // 获取行中排序列的值
        RowData sortKey = sortKeySelector.getKey(input);
        // check whether the sortKey is in the topN range
        // 判断当前数据的 排序字段内容,是否在 top n 范围
        if (checkSortKeyInBufferRange(sortKey, buffer)) {
            // insert sort key into buffer
            // 满足条件的数据,将当前值放入 buffer 缓存中
            buffer.put(sortKey, inputRowSer.copy(input));
            // 获取当前 key 对应的 集合
            Collection<RowData> inputs = buffer.get(sortKey);
            // update data state
            // copy a new collection to avoid mutating state values, see CopyOnWriteStateMap,
            // otherwise, the result might be corrupt.
            // don't need to perform a deep copy, because RowData elements will not be updated
            // 将 当前 key 的数据放到 状态中
            dataState.put(sortKey, new ArrayList<>(inputs));
            if (outputRankNumber || hasOffset()) {
                // 要输出 rank number 或 指定了 rownum > n
                // the without-number-algorithm can't handle topN with offset,
                // so use the with-number-algorithm to handle offset
                processElementWithRowNumber(sortKey, input, out);
            } else {
                // 不输出 rank number
                processElementWithoutRowNumber(input, out);
            }
        }
    }
    
    // 初始化状态 topn heap 状态 (就是 buffer, 异常恢复的时候会将状态中的数据写到 buffer 中)
    private void initHeapStates() throws Exception {
        requestCount += 1;
        // 获取 key
        RowData currentKey = (RowData) keyContext.getCurrentKey();
        // 获取 key 对应的 buffer
        buffer = kvSortedMap.get(currentKey);
        if (buffer == null) {
            // 创建 TopNBuffer,指定 排序的 comparator 和 存元素的 ArrayList
            buffer = new TopNBuffer(sortKeyComparator, ArrayList::new);
            // 将 buffer 放入 LRUMap 中, 默认存放 1000 个 key
            kvSortedMap.put(currentKey, buffer);
            // restore buffer
            // 将 dataState 中的数据到 buffer中(失败恢复的时候, 将状态中的数据,放到 buffer 里, buffer 是 heap 的,失败的时候就丢失了)
            Iterator<Map.Entry<RowData, List<RowData>>> iter = dataState.iterator();
            if (iter != null) {
                while (iter.hasNext()) {
                    Map.Entry<RowData, List<RowData>> entry = iter.next();
                    RowData sortKey = entry.getKey();
                    List<RowData> values = entry.getValue();
                    // the order is preserved
                    buffer.putAll(sortKey, values);
                }
            }
        } else {
            hitCount += 1;
        }
    }
    
    // 处理输出到 rownum 的
    private void processElementWithRowNumber(RowData sortKey, RowData input, Collector<RowData> out) throws Exception {
        // 当前 buff 的迭代器
        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
        long currentRank = 0L;
        boolean findsSortKey = false;
        RowData currentRow = null;
        // 由于 buffer 的数据本来是排序的,所以从 0 开始读 rank end 个数据,就是排序的 top n
        // 当前输入数据是 currentRow
        // 将 buffer 的第一个数据设为 prevRow ,输出更新 prevRow, 输出 currentRow
        // 将 buffer 的第二个数据设为 currentRow, 第一个设为 prevRow ,输出更新 prevRow, 输出 currentRow
        // 依次处理下去
        while (iterator.hasNext() && isInRankEnd(currentRank)) {
            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
            Collection<RowData> records = entry.getValue();
            // meet its own sort key
            if (!findsSortKey && entry.getKey().equals(sortKey)) {
                currentRank += records.size();
                currentRow = input;
                findsSortKey = true;
            } else if (findsSortKey) {
                Iterator<RowData> recordsIter = records.iterator();
                while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
                    RowData prevRow = recordsIter.next();
                    // 输出更新之前的 currentRand 的数据
                    collectUpdateBefore(out, prevRow, currentRank);
                    // 输出当前数据 rownum = currentRand
                    collectUpdateAfter(out, currentRow, currentRank);
                    currentRow = prevRow;
                    currentRank += 1;
                }
            } else {
                currentRank += records.size();
            }
        }
        // 如果 buffer 数据是空的,输出当前数据做为 currentRank(即 0)
        if (isInRankEnd(currentRank)) {
            // there is no enough elements in Top-N, emit INSERT message for the new record.
            collectInsert(out, currentRow, currentRank);
        }
    
        // remove the records associated to the sort key which is out of topN
        // 继续遍历,将 buffer 中超过 rank end 的数据删掉 , 将 dataState 中多的数据删掉
        List<RowData> toDeleteSortKeys = new ArrayList<>();
        while (iterator.hasNext()) {
            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
            RowData key = entry.getKey();
            dataState.remove(key);
            toDeleteSortKeys.add(key);
        }
        for (RowData toDeleteKey : toDeleteSortKeys) {
            buffer.removeAll(toDeleteKey);
        }
    }
    
    // 处理 不带 row number 的
    // 如果 buffer 中数据条数 > rank end,将最后一个元素从缓存和状态删掉(最后一个  top n 中 rownum 最大的一个)
    // 如果 buffer 中最后一个 entry 集合不为空,且不等于当前输入数据,就对最后一个元素输出一条删除信息
    // 输出当前数据
    private void processElementWithoutRowNumber(RowData input, Collector<RowData> out) throws Exception {
        // remove retired element
        if (buffer.getCurrentTopNum() > rankEnd) {
            Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry();
            RowData lastKey = lastEntry.getKey();
            Collection<RowData> lastList = lastEntry.getValue();
            RowData lastElement = buffer.lastElement();
            int size = lastList.size();
            // remove last one
            if (size <= 1) {
                buffer.removeAll(lastKey);
                dataState.remove(lastKey);
            } else {
                buffer.removeLast();
                // last element has been removed from lastList, we have to copy a new collection
                // for lastList to avoid mutating state values, see CopyOnWriteStateMap,
                // otherwise, the result might be corrupt.
                // don't need to perform a deep copy, because RowData elements will not be updated
                dataState.put(lastKey, new ArrayList<>(lastList));
            }
            if (size == 0 || input.equals(lastElement)) {
                return;
            } else {
                // lastElement shouldn't be null
                collectDelete(out, lastElement);
            }
        }
        // it first appears in the TopN, send INSERT message
        // 只输出当前数据
        collectInsert(out, input);
    }
    复制代码

    ## CleanupState 清除 top n 的状态

    top n 的缓存数据是存放在 MapState 中的,如果一个 key 超过 maxRetentionTime 没有数据,会清理当前状态,top n 就重新计算

    minRetentionTime 就是配置的参数 “table.exec.state.ttl” 的值, maxRetentionTime 是 minRetentionTime * 3 / 2 , 默认值 minRetentionTime=10h

    复制代码
    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState,
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService) throws Exception {
    
        // last registered timer
        Long curCleanupTime = cleanupTimeState.value();
    
        // check if a cleanup timer is registered and
        // that the current cleanup timer won't delete state we need to keep
        // 当前清理时间状态为空 或者 当前时间 + 状态最小保留时间 > 当前清理时间是,注册新的清理时间状态
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
            // we need to register a new (later) timer
            long cleanupTime = currentTime + maxRetentionTime;
            // register timer and remember clean-up time
            timerService.registerProcessingTimeTimer(cleanupTime);
            // delete expired timer,删除过期的 timer,持续有数的 key ,状态永远不会清理,必须要超过 maxRetentionTime 无数据,才会清理过期状态
            // 类似于: 清理最近 maxRetentionTime 时间没有使用的状态
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            cleanupTimeState.update(cleanupTime);
        }
    }
    复制代码

    ## kafka sink 

    在 sink 到 upsert kafka 时,会基于输出数据的 RowKind (INSERT, UPSATE_BEFORE,UPDATE_AFTER, DELETE) 判断, DELETE 和 UPDATE_BEFORE 消息会直接置空

    复制代码
    null
    {"user_id":"user_id_8160","item_id":"item_id_1","category_id":"category_id_1","behavior":1,"sales":15.0,"sort_col":45,"ts":"2021-03-19 17:15:25"}
    null
    {"user_id":"user_id_8161","item_id":"item_id_1","category_id":"category_id_1","behavior":1,"sales":45.0,"sort_col":15,"ts":"2021-03-19 17:15:26"}
    null
    {"user_id":"user_id_8162","item_id":"item_id_1","category_id":"category_id_1","behavior":1,"sales":5.0,"sort_col":25,"ts":"2021-03-19 17:15:27"}
    null
    {"user_id":"user_id_8163","item_id":"item_id_1","category_id":"category_id_1","behavior":1,"sales":25.0,"sort_col":35,"ts":"2021-03-19 17:15:28"}
    复制代码

    注: 之前以为是个bug,没想到是这样设计的(为什么不直接屏蔽呢)

    DynamicKafkaSerializationSchema.java

    复制代码
    @Override
    public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
        // shortcut in case no input projection is required
        if (keySerialization == null && !hasMetadata) {
            final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
            return new ProducerRecord<>(
                    topic,
                    extractPartition(consumedRow, null, valueSerialized),
                    null,
                    valueSerialized);
        }
    
        final byte[] keySerialized;
        if (keySerialization == null) {
            keySerialized = null;
        } else {
            final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters);
            keySerialized = keySerialization.serialize(keyRow);
        }
    
        final byte[] valueSerialized;
        final RowKind kind = consumedRow.getRowKind();
        final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
        // upsert kafka 不能修改已发送的数据,所以 DELETE 和 UPDATE_BEFORE 消息会置 null,其他消息都改成 INSERT 的
        if (upsertMode) {
            if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
                // transform the message as the tombstone message
                valueSerialized = null;
            } else {
                // make the message to be INSERT to be compliant with the INSERT-ONLY format
                valueRow.setRowKind(RowKind.INSERT);
                valueSerialized = valueSerialization.serialize(valueRow);
            }
        } else {
            valueSerialized = valueSerialization.serialize(valueRow);
        }
    
        return new ProducerRecord<>(
                topic,
                extractPartition(consumedRow, keySerialized, valueSerialized),
                readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.TIMESTAMP),
                keySerialized,
                valueSerialized,
                readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS));
    }
    复制代码

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

     

  • 相关阅读:
    通过设置P3P头来实现跨域访问COOKIE
    随心所欲玩复制 详解robocopy
    MySQL的mysqldump工具的基本用法
    uvm_void 寂静的空宇
    Chisel语言
    IP-XACT IP IEEE交换格式
    SystemC简介
    ( 转)UVM验证方法学之一验证平台
    (转)让你彻底理解:静态时序分析
    (转)存储芯片入门漫谈
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14653331.html
Copyright © 2011-2022 走看看