本文讨论的限流并非是网关层面的限流,而是业务代码中的逻辑限流。
常见的限流算法有:计数器、令牌桶、漏桶。
AtomicInteger
public class AtomicIntegerDemo {
private static AtomicInteger count = new AtomicInteger(0);
public static void exec() {
if (count.get() >= 5) {
System.out.println("系统繁忙 thread name:"+Thread.currentThread().getName());
} else {
count.incrementAndGet();
try {
//处理核心逻辑
TimeUnit.SECONDS.sleep(10);
System.out.println("处理业务 thread name:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
count.decrementAndGet();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(() -> exec(),"thread-"+i).start();
}
}
}
弊端:使用 AomicInteger 简单粗暴超过域值就拒绝请求,可能只是瞬时的请求量高,也会拒绝请求。
基于 Semaphore
public class SemaphoreDemo {
private static Semaphore semphore = new Semaphore(5);
public static void exec() {
if(semphore.getQueueLength()>100){//超过阈值100
System.out.println("系统繁忙 thread name:"+Thread.currentThread().getName());
}
try {
semphore.acquire();
// 处理核心逻辑
TimeUnit.SECONDS.sleep(10);
System.out.println("处理业务 thread name:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semphore.release();
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(() -> exec(),"thread-"+i).start();
}
}
}
Semaphore信号量来控制并发执行的次数,如果超过域值信号量,则进入阻塞队列中排队等待获取信号量进行执行。如果阻塞队列中排队的请求过多超出系统处理能力,则可以在拒绝请求。
漏桶算法
令牌桶算法
令牌桶算法是对漏桶算法的一种改进,桶算法能够限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用。
在令牌桶算法中,存在一个桶,用来存放固定数量的令牌。算法中存在一种机制,以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝。放令牌这个动作是持续不断的进行,如果桶中令牌数达到上限,就丢弃令牌,所以就存在这种情况,桶中一直有大量的可用令牌,这时进来的请求就可以直接拿到令牌执行,比如设置qps为100,那么限流器初始化完成一秒后,桶中就已经有100个令牌了,这时服务还没完全启动好,等启动完成对外提供服务时,该限流器可以抵挡瞬时的100个请求。所以,只有桶中没有令牌时,请求才会进行等待,最后相当于以一定的速率执行。
Guava RateLimiter 提供了令牌桶算法可用于平滑突发限流策略。 limiter.acquire(1) 表示消费一个令牌。当桶中有足够的令牌时,则直接返回0,否则阻塞。
public class RateLimiterDemo {
private static RateLimiter limiter = RateLimiter.create(5);
public static void exec() {
limiter.acquire(1);
// 可以设置超时时间,超时返回
// boolean b = rateLimiter.tryAcquire(1, TimeUnit.SECONDS);
try {
// 处理核心逻辑
TimeUnit.SECONDS.sleep(10);
System.out.println("处理业务 thread name:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(() -> exec(),"thread-"+i).start();
}
}
}
令牌桶和漏桶对比:
-
-
漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
-
令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
-
漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
-
令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;
-
两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的
前面讨论的几种算法都属于单机限流的范畴,但是业务需求五花八门,简单的单机限流,根本无法满足他们。
比如为了限制某个资源被每个用户或者商户的访问次数,5s只能访问2次,或者一天只能调用1000次,这种需求,单机限流是无法实现的,这时就需要通过集群限流进行实现。
如何实现?为了控制访问次数,肯定需要一个计数器,而且这个计数器只能保存在第三方服务,比如redis。
大概思路:每次有相关操作的时候,就向redis服务器发送一个incr命令,比如需要限制某个用户访问/index接口的次数,只需要拼接用户id和接口名生成redis的key,每次该用户访问此接口时,只需要对这个key执行incr命令,在这个key带上过期时间,就可以实现指定时间的访问频率。
Java伪代码实现逻辑
if(key 存在){ value++; if(value>=limit){ 不能访问 return false } }else{ 添加key,value为1 设置key过期时间为expire }
Redisson客户端实现
public class RedissonDemo { public static void main(String[] args) { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://ip:port"); RedissonClient redissonClient = Redisson.create(config); RRateLimiter rateLimiter = redissonClient.getRateLimiter("100"); // 最大流速 = 每1秒钟产生10个令牌 rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS); //阻塞式 rateLimiter.tryAcquire(1); //设置等待时间 // boolean flag = redissonClient.getRateLimiter("rateLimiter").tryAcquire(100,1, TimeUnit.SECONDS); } }
acquire方法Lua脚本代码 (RedissonRateLimiter.java)
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" - "local interval = redis.call('hget', KEYS[1], 'interval');" ype = redis.call('hget', KEYS[1], 'type');" - "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" + "local valueName = KEYS[2];" + "if type == '1' then " + "valueName = KEYS[3];" + "end;" + "local currentValue = redis.call('get', valueName); " + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end; " + "else " + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); " + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), value, commandExecutor.getConnectionManager().getId().toString()); }