zoukankan      html  css  js  c++  java
  • 多个inputstream的情况下,watermark的值怎么赋值? kakfa中多个partition提取 watermark

    1,
    org.apache.flink.streaming.api.operators; AbstractStreamOperator

    public void processWatermark1(Watermark mark) throws Exception {
    input1Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    public void processWatermark2(Watermark mark) throws Exception {
    input2Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    2,
    http://vinoyang.com/2016/10/29/flink-streaming-window-operator-analysis/


    3
    , kakfa中多个partition提取 watermark
    private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback 

    public void onProcessingTime(long timestamp) throws Exception {

    long minAcrossAll = Long.MAX_VALUE;
    boolean isEffectiveMinAggregation = false;
    for (KafkaTopicPartitionState<?> state : allPartitions) {

    // we access the current watermark for the periodic assigners under the state
    // lock, to prevent concurrent modification to any internal variables
    final long curr;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (state) {
    curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
    }

    minAcrossAll = Math.min(minAcrossAll, curr);
    isEffectiveMinAggregation = true;
    }

    // emit next watermark, if there is one
    if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
    lastWatermarkTimestamp = minAcrossAll;
    emitter.emitWatermark(new Watermark(minAcrossAll));
    }

    // schedule the next watermark
    timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
    }
  • 相关阅读:
    原生和jQuery的ajax用法
    sublime常用快捷键
    用filter:grayscale将图片过滤成灰色
    Docker搭建Zookeeper集群问题总结
    Linux下jdk环境配置
    window MySQL解压缩版部署及配置
    Windows下Nginx的配置及配置文件部分介绍
    JS 特性:可选链(?.)
    509道Java面试题解析:2020年最新Java面试题
    阿里面试题BIO和NIO数量问题附答案和代码
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9121401.html
Copyright © 2011-2022 走看看