国际惯例原理图
代码实现
package Thread; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * @ProjectName: cutter-point * @Package: Thread * @ClassName: RateLimiter * @Author: xiaof * @Description: 令牌桶,限流 * @Date: 2019/6/21 11:41 * @Version: 1.0 */ public class RateLimiter { //限流消费的令牌 private final AtomicInteger consumedTokens = new AtomicInteger(); private final AtomicLong lastRefrushTokenTime = new AtomicLong(0); //限流类型,是秒,还是分 private final long rateType; public RateLimiter(TimeUnit averageRateUnit) { switch (averageRateUnit) { case SECONDS: rateType = 1000; break; case MINUTES: rateType = 60 * 1000; break; default: throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported"); } } //请求令牌,判断是否可以获取到新的令牌 public boolean acquire(int bucketSize, long avgRate) { return acquire(bucketSize, avgRate, System.currentTimeMillis()); } public boolean acquire(int bucketSize, long avgRate, long curentTimeMillis) { if(bucketSize <= 0 || avgRate <= 0) { return true;//如果这2个参数,任意一个为0 ,我们就认为没有上限 } //刷新令牌桶 refillToken(bucketSize, avgRate, curentTimeMillis); //开始消费令牌 return consumToken(bucketSize); } private void refillToken(int bucketSize, long avgRate, long currentTimeMillis) { //获取上次最后以后更新令牌时间 long freshTime = lastRefrushTokenTime.get(); //获取当前间隔时间 long timeDelta = currentTimeMillis - freshTime; //计算这次需要填充的token数 long newToken = timeDelta * avgRate /rateType; if(newToken > 0) { //新的更新时间 long newFillTime = freshTime == 0 ? currentTimeMillis : freshTime + timeDelta; //用cas操作,以保证只有一个线程注入新令牌 if(lastRefrushTokenTime.compareAndSet(freshTime, newFillTime)) { //死循环,直到设置成功新的令牌 while(true) { //1.获取当前消费的令牌 int currentConsumToken = consumedTokens.get(); //2.获取消费的令牌容量,跟桶极限大小比较,取小的那个 int realConsumTokens = Math.min(currentConsumToken, bucketSize); //3.计算填充之后剩余的被消费的容量,计算新增容量,用来填充被消费的令牌数 //剩余的消费容量,但是不能比0还小,这个要取值 int newConsumSize = (int) Math.max(0, realConsumTokens - newToken); //然后设置进去 if(consumedTokens.compareAndSet(currentConsumToken, newConsumSize)) { return; } } } } } //消费令牌 private boolean consumToken(int bucketSize) { while (true) { int currentLevel = consumedTokens.get(); //如果超出负载 if (currentLevel >= bucketSize) { return false; } //每次消费一个 if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) { return true; } } } public void reset() { consumedTokens.set(0); lastRefrushTokenTime.set(0); } }
到这里可能有的人不清楚怎么用,来我们测试一波
我们假设有100个用户同时请求,然后令牌恢复速率调成10,然后速率单位改为秒,也就是1秒恢复10个令牌
这样同时100个请求过来,马上令牌就会被用完,那么就会被限流,比如我们拦截器这个时候可以返回404,或者503
public static void main(String args[]) { RateLimiter rateLimiter = new RateLimiter(TimeUnit.SECONDS); final int bucketSize = 10; //回复令牌生产速率 final long avgRate = 10; //判断是否流量达到上限 ExecutorService pool = Executors.newCachedThreadPool(); for(int i = 0; i < 100; ++i) { pool.submit(new Runnable() { @Override public void run() { while(true) { try { Thread.sleep(((int) (Math.random() * 10)) * 1000) ; if(!rateLimiter.acquire(bucketSize, avgRate)) { System.err.println(Thread.currentThread().getName() + "已经限流成功----response.setStatus(404)"); } else { System.out.println(Thread.currentThread().getName() + "正常执行"); } } catch (InterruptedException e) { e.printStackTrace(); } } } }); } while(true) { } }
效果展示