zoukankan      html  css  js  c++  java
  • BlockTransferThrottler(hadoop中用于控制处理带宽的util类)

    hadoop中用该类来控制对于数据的处理速度 ,主要思想如下:

    1、将数据处理分为一个个的周期(period)

    2、每个周期内处理的bytes固定 。

    3、定义每次处理周期的开始时间 。

    4、每次处处理完后调用BlockTransferThrottler.throttle(long numOfBytes)方法,若是还未到周期的结束时间,已经将处理了相应的bytes,则该线程wait 。

    代码以及注视如下:

    package org.apache.hadoop.hdfs.server.datanode;

    /**
    * a class to throttle the block transfers.
    * This class is thread safe. It can be shared by multiple threads.
    * The parameter bandwidthPerSec specifies the total bandwidth shared by
    * threads.
    * 该类用于控制数据传输的速率
    * 主要是对于每次处理(读/写)分成周期(period),若是该线程的处理速度太快(在这个周期时间还未结束,已经处理了所有的bytes)
    * 则wait该线程一会时间,详见93行
    */
    class BlockTransferThrottler {
    private long period; // period over which bw is imposed 处理周期
    private long periodExtension; // Max period over which bw accumulates. 处理的最大周期(一般周期的3倍)
    private long bytesPerPeriod; // total number of bytes can be sent in each period 每个时间周期发送的字节数
    private long curPeriodStart; // current period starting time 当前周期开始的时间
    private long curReserve; // remaining bytes can be sent in the period 剩余可发送的字节数
    private long bytesAlreadyUsed; //已经发送的字节数

    /** Constructor
    *
    @param bandwidthPerSec bandwidth allowed in bytes per second.
    */
    BlockTransferThrottler(long bandwidthPerSec) {
    this(500, bandwidthPerSec); // by default throttling period is 500ms
    }

    /**
    * Constructor
    *
    @param period in milliseconds. Bandwidth is enforced over this
    * period.
    *
    @param bandwidthPerSec bandwidth allowed in bytes per second.
    */
    BlockTransferThrottler(long period, long bandwidthPerSec) {
    this.curPeriodStart = System.currentTimeMillis();
    this.period = period;
    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
    this.periodExtension = period*3;
    }

    /**
    *
    @return current throttle bandwidth in bytes per second.
    * 返回处理数据的带宽 bytes per second
    */
    synchronized long getBandwidth() {
    return bytesPerPeriod*1000/period;
    }

    /**
    * Sets throttle bandwidth. This takes affect latest by the end of current
    * period.
    *
    *
    @param bytesPerSecond
    */
    synchronized void setBandwidth(long bytesPerSecond) {
    if ( bytesPerSecond <= 0 ) {
    throw new IllegalArgumentException("" + bytesPerSecond);
    }
    bytesPerPeriod = bytesPerSecond*period/1000;
    }

    /** Given the numOfBytes sent/received since last time throttle was called,
    * make the current thread sleep if I/O rate is too fast
    * compared to the given bandwidth.
    *
    *
    @param numOfBytes
    * number of bytes sent/received since last time throttle was called
    */
    synchronized void throttle(long numOfBytes) {
    if ( numOfBytes <= 0 ) {
    return;
    }

    curReserve -= numOfBytes;
    bytesAlreadyUsed += numOfBytes;
    //一个周期内数据已经发送完
    while (curReserve <= 0) {
    long now = System.currentTimeMillis();
    long curPeriodEnd = curPeriodStart + period;
    //当前时间<(开始时间+周期长度),即提前完成,则wait
    if ( now < curPeriodEnd ) {
    // Wait for next period so that curReserve can be increased.
    try {
    wait( curPeriodEnd - now );
    } catch (InterruptedException ignored) {}
    } else if ( now < (curPeriodStart + periodExtension)) {
    curPeriodStart = curPeriodEnd;
    curReserve += bytesPerPeriod;
    } else {
    // discard the prev period. Throttler might not have
    // been used for a long time.
    curPeriodStart = now;
    curReserve = bytesPerPeriod - bytesAlreadyUsed;
    }
    }

    bytesAlreadyUsed -= numOfBytes;
    }
    }
  • 相关阅读:
    ASP.NET CORE 使用Consul实现服务治理与健康检查(2)——源码篇
    ASP.NET CORE 使用Consul实现服务治理与健康检查(1)——概念篇
    Asp.Net Core 单元测试正确姿势
    如何通过 Docker 部署 Logstash 同步 Mysql 数据库数据到 ElasticSearch
    Asp.Net Core2.2 源码阅读系列——控制台日志源码解析
    使用VS Code 开发.NET CORE 程序指南
    .NetCore下ES查询驱动 PlainElastic .Net 升级官方驱动 Elasticsearch .Net
    重新认识 async/await 语法糖
    EF添加
    EF修改部分字段
  • 原文地址:https://www.cnblogs.com/serendipity/p/2378639.html
Copyright © 2011-2022 走看看