zoukankan      html  css  js  c++  java
  • 多个inputstream的情况下,watermark的值怎么赋值? kakfa中多个partition提取 watermark

    1,
    org.apache.flink.streaming.api.operators; AbstractStreamOperator

    public void processWatermark1(Watermark mark) throws Exception {
    input1Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    public void processWatermark2(Watermark mark) throws Exception {
    input2Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    2,
    http://vinoyang.com/2016/10/29/flink-streaming-window-operator-analysis/


    3
    , kakfa中多个partition提取 watermark
    private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback 

    public void onProcessingTime(long timestamp) throws Exception {

    long minAcrossAll = Long.MAX_VALUE;
    boolean isEffectiveMinAggregation = false;
    for (KafkaTopicPartitionState<?> state : allPartitions) {

    // we access the current watermark for the periodic assigners under the state
    // lock, to prevent concurrent modification to any internal variables
    final long curr;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (state) {
    curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
    }

    minAcrossAll = Math.min(minAcrossAll, curr);
    isEffectiveMinAggregation = true;
    }

    // emit next watermark, if there is one
    if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
    lastWatermarkTimestamp = minAcrossAll;
    emitter.emitWatermark(new Watermark(minAcrossAll));
    }

    // schedule the next watermark
    timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
    }
  • 相关阅读:
    conda更换为清华镜像源
    NVDIA的GPU驱动升级
    Windows使用nvidia-smi查看GPU信息
    查询GPU是否支持CUDA
    PyTorch教程【一】Pytorch环境的配置及安装
    JAVA基础篇—继承
    SQL
    随机数
    hdu 4751 Divide Groups 二分图
    hdu 4126 Genghis Khan the Conqueror hdu 4756 Install Air Conditioning 最小生成树
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9121401.html
Copyright © 2011-2022 走看看