zoukankan      html  css  js  c++  java
  • BlockTransferThrottler(hadoop中用于控制处理带宽的util类)

    hadoop中用该类来控制对于数据的处理速度 ,主要思想如下:

    1、将数据处理分为一个个的周期(period)

    2、每个周期内处理的bytes固定 。

    3、定义每次处理周期的开始时间 。

    4、每次处处理完后调用BlockTransferThrottler.throttle(long numOfBytes)方法,若是还未到周期的结束时间,已经将处理了相应的bytes,则该线程wait 。

    代码以及注视如下:

    package org.apache.hadoop.hdfs.server.datanode;

    /**
    * a class to throttle the block transfers.
    * This class is thread safe. It can be shared by multiple threads.
    * The parameter bandwidthPerSec specifies the total bandwidth shared by
    * threads.
    * 该类用于控制数据传输的速率
    * 主要是对于每次处理(读/写)分成周期(period),若是该线程的处理速度太快(在这个周期时间还未结束,已经处理了所有的bytes)
    * 则wait该线程一会时间,详见93行
    */
    class BlockTransferThrottler {
    private long period; // period over which bw is imposed 处理周期
    private long periodExtension; // Max period over which bw accumulates. 处理的最大周期(一般周期的3倍)
    private long bytesPerPeriod; // total number of bytes can be sent in each period 每个时间周期发送的字节数
    private long curPeriodStart; // current period starting time 当前周期开始的时间
    private long curReserve; // remaining bytes can be sent in the period 剩余可发送的字节数
    private long bytesAlreadyUsed; //已经发送的字节数

    /** Constructor
    *
    @param bandwidthPerSec bandwidth allowed in bytes per second.
    */
    BlockTransferThrottler(long bandwidthPerSec) {
    this(500, bandwidthPerSec); // by default throttling period is 500ms
    }

    /**
    * Constructor
    *
    @param period in milliseconds. Bandwidth is enforced over this
    * period.
    *
    @param bandwidthPerSec bandwidth allowed in bytes per second.
    */
    BlockTransferThrottler(long period, long bandwidthPerSec) {
    this.curPeriodStart = System.currentTimeMillis();
    this.period = period;
    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
    this.periodExtension = period*3;
    }

    /**
    *
    @return current throttle bandwidth in bytes per second.
    * 返回处理数据的带宽 bytes per second
    */
    synchronized long getBandwidth() {
    return bytesPerPeriod*1000/period;
    }

    /**
    * Sets throttle bandwidth. This takes affect latest by the end of current
    * period.
    *
    *
    @param bytesPerSecond
    */
    synchronized void setBandwidth(long bytesPerSecond) {
    if ( bytesPerSecond <= 0 ) {
    throw new IllegalArgumentException("" + bytesPerSecond);
    }
    bytesPerPeriod = bytesPerSecond*period/1000;
    }

    /** Given the numOfBytes sent/received since last time throttle was called,
    * make the current thread sleep if I/O rate is too fast
    * compared to the given bandwidth.
    *
    *
    @param numOfBytes
    * number of bytes sent/received since last time throttle was called
    */
    synchronized void throttle(long numOfBytes) {
    if ( numOfBytes <= 0 ) {
    return;
    }

    curReserve -= numOfBytes;
    bytesAlreadyUsed += numOfBytes;
    //一个周期内数据已经发送完
    while (curReserve <= 0) {
    long now = System.currentTimeMillis();
    long curPeriodEnd = curPeriodStart + period;
    //当前时间<(开始时间+周期长度),即提前完成,则wait
    if ( now < curPeriodEnd ) {
    // Wait for next period so that curReserve can be increased.
    try {
    wait( curPeriodEnd - now );
    } catch (InterruptedException ignored) {}
    } else if ( now < (curPeriodStart + periodExtension)) {
    curPeriodStart = curPeriodEnd;
    curReserve += bytesPerPeriod;
    } else {
    // discard the prev period. Throttler might not have
    // been used for a long time.
    curPeriodStart = now;
    curReserve = bytesPerPeriod - bytesAlreadyUsed;
    }
    }

    bytesAlreadyUsed -= numOfBytes;
    }
    }
  • 相关阅读:
    饿了么P7级前端工程师进入大厂的面试经验
    前端程序员面试的坑,简历写上这一条信息会被虐死!
    这次来分享前端的九条bug吧
    移动端开发必会出现的问题和解决方案
    创建一个dynamics 365 CRM online plugin (八)
    创建一个dynamics 365 CRM online plugin (七)
    创建一个dynamics 365 CRM online plugin (六)
    创建一个dynamics 365 CRM online plugin (五)
    使用User Primary Email作为GUID的问题
    怎样Debug Dynamics 365 CRM Plugin
  • 原文地址:https://www.cnblogs.com/serendipity/p/2378639.html
Copyright © 2011-2022 走看看