一. 整合 google.guava
Controller:
package com..web; import com..anno.RateLimitAnno; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; @RestController public class LimiterController { @RateLimitAnno @RequestMapping("/limiter") public String limiter(HttpServletRequest request){ StringBuilder sb = new StringBuilder("{"); //正常返回 sb.append("'result':'0000','msg':'成功'"); return sb.append("}").toString(); } }
自定义注解:
package com..anno; import java.lang.annotation.*; @Inherited // 允许子类继承 元注解 @Documented // 被 javadoc工具记录 @Target({ElementType.METHOD,ElementType.FIELD,ElementType.TYPE}) //注解可能出现在Java程序中的语法位置 @Retention(RetentionPolicy.RUNTIME) //注解保留时间,保留至运行时 public @interface RateLimitAnno { }
具体限流实现:
package com..util; import com.google.common.util.concurrent.RateLimiter; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component @Scope @Aspect public class RateLimitAspect { final double permitsPerSecond = 5.0; //每秒生成5个令牌 final long warmupPeriod = 1; //在warmupPeriod时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前 final TimeUnit timeUnit = TimeUnit.SECONDS; //参数warmupPeriod 的时间单位 /* * 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过qps个请求 * 当请求到来的速度超过了qps,保证每秒只处理qps个请求 * 当这个RateLimiter使用不足(即请求到来速度小于qps),会囤积最多qps个请求 * * 创建的是SmoothBursty 实例 平滑稳定 */ RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond); /** * * 根据指定的稳定吞吐率和预热期来创建RateLimiter, * 这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询), * 在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率(只要存在足够请求数来使其饱和)。 * 同样地,如果RateLimiter 在warmupPeriod时间内闲置不用,它将会逐步地返回冷却状态。 * 也就是说,它会像它第一次被创建般经历同样的预热期。 * 返回的RateLimiter 主要用于那些需要预热期的资源,这些资源实际上满足了请求(比如一个远程服务), * 而不是在稳定(最大)的速率下可以立即被访问的资源。 * 返回的RateLimiter 在冷却状态下启动(即预热期将会紧跟着发生),并且如果被长期闲置不用,它将回到冷却状态 * * 创建的是SmoothWarmingUp实例 平滑预热 */ RateLimiter rl = RateLimiter.create(permitsPerSecond,warmupPeriod,timeUnit); //设置业务切入点为标注了自定义注解的位置 @Pointcut("@annotation(com.wondersgroup.anno.RateLimitAnno)") public void aspectService(){ } //统计 int countSuccess,countFail = 0; //环绕通知 @Around("aspectService()") public Object aroundMsg(ProceedingJoinPoint joinPoint){ Object obj = null; boolean flag = rateLimiter.tryAcquire(); // 在无延迟下的情况下获得 // 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求。 // 如果存在等待的情况的话,告诉调用者获取到该请求所需要的睡眠时间。该方法等同于acquire(1)。 //double waitTime = rabuyGoodsteLimiter.acquire(); 我非要得到令牌才返回 try{ if(flag){ //如果获取了令牌,则可以继续执行业务层面的逻辑 obj = joinPoint.proceed(); countSuccess++;//并发时,统计不准确!!! }else{ obj = "{'result':'0001','msg':'当前系统繁忙,请重试...'}"; //未获取到令牌,直接返回 countFail++; } }catch(Throwable ex){ ex.printStackTrace(); } System.out.println(flag +" : "+ obj + " success:" + countSuccess + " , fail:" + countFail); return obj; } }
运行项目,访问 http://localhost:8080/limiter
结果:
二. 整合 Redisson
package com..distributed; import org.redisson.Redisson; import org.redisson.api.*; import org.redisson.config.Config; import java.util.Random; import java.util.concurrent.CountDownLatch; public class RedisRateLimiter { static RedissonClient redisson = null; static RRateLimiter myLimiter; static { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setConnectionMinimumIdleSize(10); //创建Redisson客户端 redisson = Redisson.create(config); /** * 基于Redis的分布式限流器可以用来在分布式环境下现在请求方的调用频率。 * 既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。 * 该算法不保证公平性。 */ myLimiter = redisson.getRateLimiter("my"); /** * Total rate for all RateLimiter instances * 作用在所有的RRateLimiter实例 * OVERALL * * Total rate for all RateLimiter instances working with the same Redisson instance * 作用在同一个Redisson实例创建的 RRateLimiter上面。 * PER_CLIENT * * return : 设置是否成功。 对同一个redis服务端,只需要设置一次。如果redis重启需要重新设置 */ boolean bl = myLimiter.trySetRate(RateType.PER_CLIENT, 5, 1, RateIntervalUnit.SECONDS); } //结合redis,实现分布式的qpi接口限流 public static void test() { CountDownLatch cd = new CountDownLatch(1); Random rd = new Random(); for (int i = 0; i < 20; i++) { new Thread(() -> { try { cd.await(); //使得当前线程阻塞 Thread.sleep(rd.nextInt(1000)); //模拟20个请求的并发,有一点点先后顺序的差异 } catch (InterruptedException e) { e.printStackTrace(); } finally { //获取令牌 System.out.println(Thread.currentThread().getName() + " : " + myLimiter.tryAcquire()); } }).start(); } cd.countDown(); } public static void main(String[] args) { test(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //正常退出 status为0时为正常退出程序,也就是结束当前正在运行中的java虚拟机 System.exit(0); } }
运行 main 函数
结果:
pom 配置文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.gupao.ls</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </dependency> <!--Redisson插件--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.2</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9.1</version> </dependency> <!--guava JAR--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>