zoukankan      html  css  js  c++  java
  • 多个inputstream的情况下,watermark的值怎么赋值? kakfa中多个partition提取 watermark

    1,
    org.apache.flink.streaming.api.operators; AbstractStreamOperator

    public void processWatermark1(Watermark mark) throws Exception {
    input1Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    public void processWatermark2(Watermark mark) throws Exception {
    input2Watermark = mark.getTimestamp();
    long newMin = Math.min(input1Watermark, input2Watermark);
    if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
    }
    }

    2,
    http://vinoyang.com/2016/10/29/flink-streaming-window-operator-analysis/


    3
    , kakfa中多个partition提取 watermark
    private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback 

    public void onProcessingTime(long timestamp) throws Exception {

    long minAcrossAll = Long.MAX_VALUE;
    boolean isEffectiveMinAggregation = false;
    for (KafkaTopicPartitionState<?> state : allPartitions) {

    // we access the current watermark for the periodic assigners under the state
    // lock, to prevent concurrent modification to any internal variables
    final long curr;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (state) {
    curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();
    }

    minAcrossAll = Math.min(minAcrossAll, curr);
    isEffectiveMinAggregation = true;
    }

    // emit next watermark, if there is one
    if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
    lastWatermarkTimestamp = minAcrossAll;
    emitter.emitWatermark(new Watermark(minAcrossAll));
    }

    // schedule the next watermark
    timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
    }
  • 相关阅读:
    [官网]清华大学的开源镜像站点 配置方法
    Android MIFARE NFCA源码解析
    Delphi XE8 TStyleBook的使用
    【FireMonkey】StyleBook使用方法
    Delphi第三方组件安装DCU.PAS.DPK.BPL.ActiveX控件
    M1卡说明及使用proxmark3破解方法
    M1卡修改各区块控制位值和数据
    DICOMDIR
    ZentaoPMS 系统的优先级以及修改
    集成禅道和svn
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9121401.html
Copyright © 2011-2022 走看看