zoukankan      html  css  js  c++  java
  • 生产BackPressure 的代码

    public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {

    private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);

    /** Maximum stack trace depth for samples. */
    static final int MAX_STACK_TRACE_DEPTH = 3;

    /** Expected class name for back pressure indicating stack trace element. */
    static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";

    /** Expected method name for back pressure indicating stack trace element. */
    static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking";

    /** Lock guarding trigger operations. */
    private final Object lock = new Object();

    /* Stack trace sample coordinator. */
    private final StackTraceSampleCoordinator coordinator;


    @Override
    public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
    return toBufferBuilder(requestMemorySegment(true));
    }


    private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException {
       synchronized (availableMemorySegments) {
          returnExcessMemorySegments();
          boolean askToRecycle = owner != null;
          // fill availableMemorySegments with at least one element, wait if required
          while (availableMemorySegments.isEmpty()) {
             if (isDestroyed) {
                throw new IllegalStateException("Buffer pool is destroyed.");
             }
             if (numberOfRequestedMemorySegments < currentPoolSize) {
                final MemorySegment segment = networkBufferPool.requestMemorySegment();
                if (segment != null) {
                   numberOfRequestedMemorySegments++;
                   return segment;
                }
             }
             if (askToRecycle) {
                owner.releaseMemory(1);
             }
             if (isBlocking) {
                availableMemorySegments.wait(2000);
             }
             else {
                return null;
             }
          }
          return availableMemorySegments.poll();
       }



    /**
    * Returns back pressure statistics for a operator. Automatically triggers stack trace sampling
    * if statistics are not available or outdated.
    *
    * @param vertex Operator to get the stats for.
    * @return Back pressure statistics for an operator
    */
    public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
    synchronized (lock) {
    final OperatorBackPressureStats stats = operatorStatsCache.getIfPresent(vertex);
    if (stats == null || backPressureStatsRefreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
    triggerStackTraceSampleInternal(vertex);
    }
    return Optional.ofNullable(stats);
    }
    }


    /**
    * Triggers a stack trace sample for a operator to gather the back pressure
    * statistics. If there is a sample in progress for the operator, the call
    * is ignored.
    *
    * @param vertex Operator to get the stats for.
    * @return Flag indicating whether a sample with triggered.
    */
    private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
    assert(Thread.holdsLock(lock));

    if (shutDown) {
    return false;
    }

    if (!pendingStats.contains(vertex) &&
    !vertex.getGraph().getState().isGloballyTerminalState()) {

    Executor executor = vertex.getGraph().getFutureExecutor();

    // Only trigger if still active job
    if (executor != null) {
    pendingStats.add(vertex);

    if (LOG.isDebugEnabled()) {
    LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
    }

    CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
    vertex.getTaskVertices(),
    numSamples,
    delayBetweenSamples,
    MAX_STACK_TRACE_DEPTH);

    sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);

    return true;
    }
    }

    return false;
    }
  • 相关阅读:
    c/c++ 传统数组的缺点
    在Eclipse中搭建C/C++环境
    web服务编码设置
    锋利的jQuery学习总结
    SQL调优常用方法
    C#(KeyChar和KeyCord值,KeyDown/KeyPress事件区别)
    winform 屏蔽 空格键
    $.ajax与$.post、$.get的一点区别
    jquery $.ajax $.get $.post的区别?
    $(function() {}),即$(document).ready(function(),什么时候执行?以此为准
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9523381.html
Copyright © 2011-2022 走看看