zoukankan      html  css  js  c++  java
  • pulsar 实现的一种 RateLimiter

    pulsar 实现了一个 RateLimiter 来限制 dispatch 的速率。


    大体思路是:初始有 n 个令牌,当令牌被申请完了后,其他人就无法获得令牌了,每隔一段时间 t 会清零已分配的令牌数。
    所以,记住这 2 个参数即可。

    通过一个测试用例,观察 RateLimiter 的用法。

    // org.apache.pulsar.common.util.RateLimiterTest#testMultipleAcquire
    public void testMultipleAcquire() throws Exception {
        // 每过 1000ms 重置令牌数
        final long rateTimeMSec = 1000;
        // 令牌总数为 100
        final int permits = 100;
        final int acquirePermist = 50;
        RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
        long start = System.currentTimeMillis();
        for (int i = 0; i < permits / acquirePermist; i++) {
            // 1 次获取 50 个令牌,2 次申请完 100 个令牌
            rate.acquire(acquirePermist);
        }
        long end = System.currentTimeMillis();
        assertTrue((end - start) < rateTimeMSec);
        // 时间还不到 1000ms,令牌没有重置,则可用令牌仍为 0
        assertEquals(rate.getAvailablePermits(), 0);
        rate.close();
    }

    接下来看下实现:

    org.apache.pulsar.common.util.RateLimiter
      // 令牌总数
      private long permits;
      // 当前已分配的令牌数
      private long acquiredPermits;
      
      // 清理已分配令牌数的定时任务 1. 线程池 2. 定时任务 3. 间隔时间
      private final ScheduledExecutorService executorService;
      private ScheduledFuture<?> renewTask;
      private long rateTime;
      // 提供了一个接口用来修改令牌总数
      private Supplier<Long> permitUpdater; 

    申请 acquirePermit 个令牌,自旋模式

    public synchronized void acquire(long acquirePermit) throws InterruptedException {
        checkArgument(!isClosed(), "Rate limiter is already shutdown");
        checkArgument(acquirePermit <= this.permits,
                "acquiring permits must be less or equal than initialized rate =" + this.permits);
    
        // 如果还没创建清理定时任务,则创建它
        if (renewTask == null) {
            renewTask = createTask();
        }
    
        boolean canAcquire = false;
        do {
            // 如果已分配令牌数小于总令牌数,可以分配
            canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
            if (!canAcquire) {
                // 阻塞当前线程
                wait();
            } else {
                // 增加已分配令牌数
                acquiredPermits += acquirePermit;
            }
        } while (!canAcquire);
    }

    申请 acquirePermit 个令牌,快速失败模式

    public synchronized boolean tryAcquire(long acquirePermit) {
        checkArgument(!isClosed(), "Rate limiter is already shutdown");
        // lazy init and start task only once application start using it
        if (renewTask == null) {
            renewTask = createTask();
        }
    
        // acquired-permits can't be larger than the rate
        if (acquirePermit > this.permits) {
            acquiredPermits = this.permits;
            return false;
        }
        // 这里并没有严格限制,无所谓,没必要太精确
        boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
        if (canAcquire) {
            acquiredPermits += acquirePermit;
        }
        return canAcquire;
    }

    创建清理分配令牌数的定时任务

    protected ScheduledFuture<?> createTask() {
        return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
    }

    清理已分配令牌数

    synchronized void renew() {
        // 直接重置为 0
        acquiredPermits = 0;
        // 如果提供了这个对象,则用它的值来设置总令牌数
        if (permitUpdater != null) {
            long newPermitRate = permitUpdater.get();
            if (newPermitRate > 0) {
                setRate(newPermitRate);
            }
        }
        // 唤醒所有等待当前对象的线程
        notifyAll();
    }
  • 相关阅读:
    gulp-rev:项目部署缓存解决方案----gulp系列(六)
    nodemailer实现node发送邮件
    angular项目总结——angular + browserify + gulp + bower + less 架构分享
    作为面试官之后的一些体会
    前端乱炖,总结一些细节点
    webapp,liveapp: 流式布局和rem布局
    canvas剪裁图片并上传,前端一步到位,无需用到后端
    (高德地图)marker定位 bug 解决总结
    gulp-clean----gulp系列(五)
    APUE-文件和目录(一)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12764839.html
Copyright © 2011-2022 走看看