zoukankan      html  css  js  c++  java
  • 艾编教学笔记:高并发限流+分布式限流高并发限流技术揭秘

    内容概要:

    1、为什么要限流

    2、分布式限流解决方案

    3Guava实现令牌限流和漏桶限流

    4SpringBoot结合Redis实现分布式限流

    5SpringCloud GateWay网关限流---微服务SprignCloud 6Nginx限流


    为什么要限流

    目标

    学习在项目开发中为什么要使用限流技术,以及限流的作用。

    概述

    在分布式领域,我们难免会遇到并发量突增,对后端服务造成高压力,严重甚至会导致系统宕机。为避  免这种问题,我们通常会为接口添加限流、降级、熔断等能力,从而使接口更为健壮。Java领域常见的  开源组件有Netflix的hystrix,阿里系开源的sentinel等,都是蛮不错的限流熔断框架。

     

    图解

    在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和  增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性  能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀  缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一  种手段来限制这些场景的并发/请求量,即限流。

    解决一个问题:保护、保证系统一定可用。

    解决方案

    扩容

    增加物理服务的硬件和设备。

    缓存

    缓存比较好理解,在大型高并发系统中,如果没有缓存数据库将分分钟被爆,系统也会瞬间瘫痪。使用  缓存不单单能够提升系统访问速度、提高并发访问量,也是保护数据库、保护系统的有效方式。大型网  站一般主要是“读”,缓存的使用很容易被想到。在大型“写”系统中,缓存也常常扮演者非常重要的角

    色。比如累积一些数据批量写入,内存里面的缓存队列(生产消费),以及HBase写数据的机制等等也  都是通过缓存提升系统的吞吐量或者实现系统的保护措施。甚至消息中间件,你也可以认为是一种分布  式的数据缓存。

    降级

    服务降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以  此释放服务器资源以保证核心任务的正常运行。降级往往会指定不同的级别,面临不同的异常等级执行  不同的处理。根据服务方式:可以拒接服务,可以延迟服务,也有时候可以随机服务。根据服务范围:  可以砍掉某个功能,也可以砍掉某些模块。总之服务降级需要根据不同的业务需求采用不同的降级策     略。主要的目的就是服务虽然有损但是总比没有好。

    限流

    限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说  系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流  量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

    限流的目的

    限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦   达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下  单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。

    一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数

    (如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限  制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

     

    先有缓存这个银弹,后有限流来应对618、双十一高并发流量,在处理高并发问题上可以说是如虎添      翼,不用担心瞬间流量导致系统挂掉或雪崩,最终做到有损服务而不是不服务;限流需要评估好,不可  乱用,否则会正常流量出现一些奇怪的问题而导致用户抱怨。

    保护、保证系统和网站的正常运行。

    使用场景

    秒杀、抢购,年底查询密集,评论查询。

    SpringBoot结合Redis实现分布式限流演进


    目标

    为什么使用分布式限流解决方案,整个过程是如何来的。

     

    步骤

    1:搭建springboot框架

    2:导入web依赖,和guava依赖,以及redis依赖

     
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    3:最简单的限流讲解---计数器限流

    package com.itheima.limiting.web;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class OrderController {
    // 抢购,抢购数量,也就限流的阈值
    static long limit = 10;
    // 计算器,代表请求的用户数量
    private long count = 0;
    @GetMapping("/makeorder")
    public String makeOrder(){
    long c = ++count;
    if(c > limit){
    return "抢购结束,下次在来! count = " + c;
    }
    return "恭喜,抢购成功,count = " + c;
    }
    }

    存在问题:

    1:当前count没有标记成为static。需要标记成static吗?答案是不需要,因为 @RestController 标记当前类是单例的, 所以在内存中count只有一份,不会出现所谓的多份问题。而是共用的一个计数器。

    2:往往在分布式集群的项目中,项目是部署多多台,是多个  jvm。每个jvm都又自己的计数器,这个时候就会引发高并发带来的线程安全问题。

    3:那可以使用volatile 吗?答案是不可以。因为volatile只保证成员变量在线程见的可见性,它不保证线程安全。

    4:如果线程不安全可以使用synchronized ,这种是没问题的,可以解决线程安全的问题。但是同时带

    @GetMapping("/makeorder")
    public synchronized String makeOrder(){
    long c = ++count;
    if(c > limit){
    return "抢购结束,下次在来! count = " + c; 6    }
            return    "恭喜,抢购成功,count = " + c; 8    }

    来的隐患就是:性能低下,这个是必然的,如果使用了synchronized关键字,就是上了锁,代码的执行 就编程了串行,一大推的阻塞,如果这个时候又1w的并发,这个时候处理都会造成大量的阻塞。而且    性能极低。在一般的开发中我们都会认为和代码是不适合的。也满足不了我们高并发的需要。那么进行  优化,如何解决呢?原子类。

    5:如果在分布式环境下呢?

     

    可以使用分布式缓存来解决这个问题:

    为什么不用Locksynchronized,在单机环境下,是没有问题,但是往往在开发中,大部分情况下是集  群环境,这个适合每个电脑都是独立JVM环境,你是没有办法去控制别人jvm内存中的东西,所以我们    要计数器进行共享。

     

    为什么选择Redis呢?而不是memcache操作呢?

    答案很简单:Redis单线程。(基于linuxio模型来架构的,EPOLL 异步IO BIO NIO EPOLL  jedis redission

     

     

     代码如下:

    引入依赖:

    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
    </dependency>
    @GetMapping("makeorder2")
    public String makeOrder2(String name){
    // 从jedis上获取自增值
    try(Jedis jedis = new Jedis("localhost",6379)){
    long c = jedis.incr(name);
    if(c > limit){
    return "抢购结束,下次在来! count = " + c; 8    }
            return    "恭喜,抢购成功,count = " + c; 10    }
    11    }

    测试:启动80808081两个端口:访问

     

    http://localhost:8081/makeorder2?name=lisi  http://localhost:8080/makeorder2?name=lisi  同一个用户在集群环境下的并发问是否共享计数器。答案很明显是可以的。

     

     

    限流接口的时间窗请求数--时间窗限流

     

     

    概述

    即一个时间窗口内的请求数,如需限制某个接口/服务每秒/每分钟/每天的请求数/调用量。

     

     

     

     

    如果实现呢?

    Guava---单机系统

    使用LoadingCache限流

     

    package com.itheima.limiting.web;
    import com.google.common.cache.CacheBuilder;
    import com.google.common.cache.CacheLoader;
    import com.google.common.cache.LoadingCache;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import redis.clients.jedis.Jedis;
    import java.util.Calendar;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    @RestController
    public class OrderGuavaController {
    // 抢购,抢购数量,也就限流的阈值
    static long limit = 5;
    // 限时间窗请求数,限制5r/s
    LoadingCache<Long,AtomicLong> loadingCache =
    CacheBuilder.newBuilder().expireAfterAccess(2, TimeUnit.SECONDS)
    .build(new CacheLoader<Long, AtomicLong>() {
    @Override
    public AtomicLong load(Long aLong) throws Exception {
    return new AtomicLong(0);
    }
    });
    @GetMapping("/makeorder3")
    public String makeOrder() throws Exception{
    // 当前秒
    long currentTime = System.currentTimeMillis()/1000L;
    //long c = atomicLong.incrementAndGet();
    long c = loadingCache.get(currentTime).incrementAndGet();
    if(c > limit){
    return "抢购结束,下次在来! count = " + c;
    }
    return "恭喜,抢购成功,count = " + c;
    }
    }

     

    模拟代码:
    package com.itheima.limiting.limiter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.atomic.AtomicLong;
    public class MyLimiter {
    private ConcurrentMap<Long, AtomicLong> map = new ConcurrentHashMap<>
    ();
    private long before;
    public MyLimiter(long before){
    super();
    this.before = before;
    }
    public AtomicLong get(long key){
    if(!this.map.containsKey(key)){
    synchronized (map){
    if(!this.map.containsKey(key)){
    map.put(key,new AtomicLong(0L));
    // 移除指定秒数的计数器
    this.removeBeforeKey(key);
    }
    }
    }
    return this.map.get(key);
    }
    private void removeBeforeKey(long ckey){
    for(Long key : this.map.keySet()){
    // 把哪些超过时间的key,从map中移除出去。
    if(key + before < ckey){
    this.map.remove(key);
    }
    }
    }
    }
    分布式限流--Lua脚本

    目标
    使用lua脚本完成分布式限流
    步骤 
    package com.itheima.limiting.web;
    import com.google.common.cache.CacheBuilder;
    import com.google.common.cache.CacheLoader;
    import com.google.common.cache.LoadingCache;
    import com.itheima.limiting.limiter.MyLimiter;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import redis.clients.jedis.Jedis;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    @RestController
    public class OrderRedisController {
    // 抢购,抢购数量,也就限流的阈值
    static long limit = 5;
    public String makeorder6(String name) throws Exception{
    try (Jedis jedis = new Jedis("localhost",6379)){
    long c = jedis.incr(name);
    if(c > limit){
    return (System.currentTimeMillis()/1000)+"抢购结束,下次在来!
    count = " + c;
    }else{
    if(c==1){
    // 设置过期时间
    jedis.expire(name,1);
    }
    }
    return (System.currentTimeMillis()/1000) + "恭喜,抢购成功,count
    = " + c;
    }
    }
    }
    Lua脚本
     
    local key = KEYS[1] --限流Key(一秒一个)
    local limit = tonumber(ARGV[1]) --限流大小
    local expire = ARGV[2] --过期时间
    -- 获取当前计数器的值
    local current = tonumber(redis.call('get',key) or "0")
    -- 如果超过限制大小
    if current + 1 > limit then
    return 0
    Redis处理类
    测试用例
    else
    current = tonumber(redis.call('INCRBY',key,"1")) --请求数+1
    if current == 1 then --如果是第一次访问需要设置过期时间
    redis.call("expire",key,expire) --设置过期时间
    end
    end
    return 1 --返回1代表不限流
    Redis处理类
     
    package com.itheima.limiting.limiter;
    import com.google.common.io.Files;
    import org.springframework.core.io.ClassPathResource;
    import redis.clients.jedis.Jedis;
    import java.nio.charset.Charset;
    public class JedisLuaLimiter {
    private String luascript;
    private String key;
    private String limit;
    private String expire;
    public JedisLuaLimiter(String key,String limit,String expire,String
    scriptFile){
    super();
    this.key = key;
    this.limit = limit;
    this.expire = expire;
    try{
    this.luascript = Files.asCharSource(new
    ClassPathResource(scriptFile).getFile(), Charset.defaultCharset()).read();
    }catch(Exception ex){
    ex.printStackTrace();
    }
    }
    // 尝试获取
    public boolean tryAcqure(){
    Jedis jedis = new Jedis("localhost",6379);
    return (Long)jedis.eval(this.luascript,1,key,limit,expire) == 1L;
    }
    }

    测试用例

    package com.itheima.limiting;
    import com.itheima.limiting.web.OrderRedisController;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    @SpringBootTest
    class LimitingApplicationTests {
    @Autowired
    private OrderRedisController orderRedisController;
    @Test
    void contextLoads() throws Exception {
    // 倒计数锁存器
    CountDownLatch countDownLatch = new CountDownLatch(50);
    // 循环屏障
    CyclicBarrier cyclicBarrier = new CyclicBarrier(50);
    for (int i = 0; i < 50; i++) {
    new Thread(()->{
    try{
    cyclicBarrier.await();
    }catch(Exception ex){
    ex.printStackTrace();
    }
    try {
    System.out.println(Thread.currentThread().getName() +
    "===" + orderRedisController.makeorder6("aicode111"));
    }catch (Exception ex){
    ex.printStackTrace();
    }
    countDownLatch.countDown();
    }).start();
    }
    try{
    countDownLatch.await();
    }catch(Exception ex){
    ex.printStackTrace();
    }
    TimeUnit.SECONDS.sleep(1);
    System.out.println(Thread.currentThread().getName() + "===" +
    orderRedisController.makeorder6("aicode111"));
    }
    }
    削峰填谷-平滑限流某个接口的请求数
     
     
    当消费端请求骤增时,可以为其配置排队等待的流控规则,以稳定的速度逐步处理这些请求,起到
    峰填谷的效果,从而避免流量骤增造成系统负载过高。
     
     
    背景信息
     
     
    在实际应用中,收到的请求是没有规律的。例如:某应用的处理请求的能力是每秒 10 个。在某一秒,
    突然到来了 30 个请求,而接下来两秒,都没有请求到达。在这种情况下,如果直接拒绝 20 个请求,
    应用在接下来的两秒就会空闲。所以,需要把骤增的请求平均到一段时间内,让系统负载保持在请求处
    理水位之内,同时尽可能地处理更多请求。 
     

    上图中,红色的部分代表超出消息处理能力的部分。把红色部分的消息平均到之后的空闲时间去处理,  这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息。通过配置流控规则,可  以达到消息匀速处理的效果。

    功能原理

    AHAS  流控降级的排队等待功能,可以把骤增的大量请求匀速分配,以固定的间隔时间让请求通过,起削峰填谷的效果,从而避免流量骤增造成系统负载过高的情况。堆积的请求将会被排队处理,当请  求的预计排队时间超过最大超时时长时,AHAS 将拒绝这部分超时的请求。

    例如:配置匀速模式下请求 QPS  5,则每 200 ms 处理一条请求,多余的处理任务将排队;同时设置了超时时间为 5s,则预计排队时长超过 5s 的处理任务将被直接拒绝。具体操作步骤,参见新建流控规

    示意图如下:

     

     问题

    突发请求,流量整形,整形为匀速请求处理,(比如 5r/s 时间间隔200毫秒处理一个请求,平滑速率).

    解决方案:令牌桶算法

     

    package com.itheima.limiting.web;
    import com.google.common.util.concurrent.RateLimiter;
    import com.itheima.limiting.limiter.JedisLuaLimiter;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class OrderGuavaRateLimitController {
    // 平滑限流请求
    RateLimiter rateLimiter = RateLimiter.create(4);
    //@GetMapping("/makeorder7")
    public String makeorder7() throws Exception{
    if(!rateLimiter.tryAcquire()){
    return (System.currentTimeMillis()/1000)+"抢购结束,下次在来!
    " ;
    }
    return (System.currentTimeMillis()/1000) + "恭喜,抢购成功";
    }
    }

     测试代码

    package com.itheima.limiting;
    import com.itheima.limiting.web.OrderGuavaRateLimitController;
    import com.itheima.limiting.web.OrderRedisController;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    @SpringBootTest
    class LimitingApplicationTests {
    @Autowired
    private OrderRedisController orderRedisController;
    @Autowired
    private OrderGuavaRateLimitController
    orderGuavaRateLimitController;
    @Test
    void contextLoads() throws Exception {
    // 倒计数锁存器
    CountDownLatch countDownLatch = new CountDownLatch(50);
    // 循环屏障
    CyclicBarrier cyclicBarrier = new CyclicBarrier(50);
    for (int i = 0; i < 50; i++) {
    new Thread(()->{
    try{
    cyclicBarrier.await();
    }catch(Exception ex){
    ex.printStackTrace();
    }
    try {
    //System.out.println(Thread.currentThread().getName() + "===" +
    orderRedisController.makeorder6("aicode111"));
    System.out.println(Thread.currentThread().getName()
    + "===" + orderGuavaRateLimitController.makeorder7());
    }catch (Exception ex){
    ex.printStackTrace();
    }
    
    自定义令牌桶算法
    countDownLatch.countDown();
    }).start();
    }
    try{
    countDownLatch.await();
    }catch(Exception ex){
    ex.printStackTrace();
    }
    TimeUnit.SECONDS.sleep(1);
    // System.out.println(Thread.currentThread().getName() + "===" +
    orderRedisController.makeorder6("aicode111"));
    System.out.println(Thread.currentThread().getName() + "===" +
    orderGuavaRateLimitController.makeorder7());
    }
    }

     

    自定义令牌桶算法 
    package com.itheima.limiting.limiter;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.Semaphore;
    public class MyRateLimiter implements AutoCloseable{
    // 定义信号量 并发协同工具
    private Semaphore semaphore;
    // 限制数量
    private int limit;
    // 定时器
    private Timer timer;
    public MyRateLimiter(int limit){
    super();
    this.limit = limit;
    this.semaphore = new Semaphore(limit);
    this.timer = new Timer();
    // 放入令牌的时间间隔
    long period = 1000L/limit;
    // 通过定时器。定时放入令牌
    timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
    if(semaphore.availablePermits() < limit){
    semaphore.release();
    漏桶算法
    SpringBoot 结合Aop完成限流策略
    原理
    首先解释下为何采用Redis作为限流组件的核心。
    通俗地讲,假设一个用户(用IP判断)每秒访问某服务接口的次数不能超过10次,那么我们可以在
    Redis中创建一个键,并设置键的过期时间为60秒。
    当一个用户对此服务接口发起一次访问就把键值加1,在单位时间(此处为1s)内当键值增加到10的时
    候,就禁止访问服务接口。PS:在某种场景中添加访问时间间隔还是很有必要的。我们本次不考虑间隔
    时间,只关注单位时间内的访问次数。
    需求
    }
    }
    },period,period);
    }
    public void acquire() throws InterruptedException{
    this.semaphore.acquire();
    }
    public boolean tryAcquire(){
    return this.semaphore.tryAcquire();
    }
    public int availablePermits(){
    return this.semaphore.availablePermits();
    }
    @Override
    public void close(){
    System.out.println("自动来关闭了..............");
    this.timer.cancel();
    }
    }

    1 @GetMapping("/makeorder")

    
    

    2 public synchronized String makeOrder(){

    
    

    3 long c = ++count;

    
    

    4 if(c > limit){

    
    

    5 return "抢购结束,下次在来! count = " + c; 6}

    
    

    7return"恭喜,抢购成功,count = " + c; 8}

     
    漏桶算法
     

    SpringBoot 结合Aop完成限流策略

     

     

    原理

    首先解释下为何采用Redis作为限流组件的核心。

    通俗地讲,假设一个用户(用IP判断)每秒访问某服务接口的次数不能超过10次,那么我们可以在

    Redis中创建一个键,并设置键的过期时间为60秒。

    当一个用户对此服务接口发起一次访问就把键值加1,在单位时间(此处为1s)内当键值增加到10的时  候,就禁止访问服务接口。PS:在某种场景中添加访问时间间隔还是很有必要的。我们本次不考虑间隔  时间,只关注单位时间内的访问次数。

    需求


    原理已经讲过了,说下需求。

    1. 基于Redisincr及过期机制开发

    1. Redis整合

    由于我们是基于Redis进行的限流操作,因此需要整合Redis的类库,上面已经讲到,我们是基于

    Springboot进行的开发,因此这里可以直接整合RedisTemplate

    1.1 坐标引入

     

    这里我们引入spring-boot-starter-redis的依赖。

    正式开发

    到这里,我们正式开始手写限流组件的进程。

    1. 工程定义

    项目基于maven构建,主要依赖Spring-boot-starter,我们主要在springboot上进行开发,因此自定义  的开发包可以直接依赖下面这个坐标,方便进行包管理。版本号自行选择稳定版。

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <version>1.4.2.RELEASE</version>
    </dependency>

     2、Redis整合

    由于我们是基于Redis进行的限流操作,因此需要整合Redis的类库,上面已经讲到,我们是基于

    Springboot进行的开发,因此这里可以直接整合RedisTemplate。

    1.1 坐标引入

    这里我们引入spring-boot-starter-redis的依赖。

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.2.RELEASE</version>
    </dependency>
    2.2 注入CacheManager及RedisTemplate
     
    新建一个Redis的配置类,命名为RedisCacheConfifig,使用javaconfifig形式注入CacheManager及
    RedisTemplate。为了操作方便,我们采用了Jackson进行序列化。代码如下 
    @Configuration
    @EnableCaching
    public class RedisCacheConfig {
    private static final Logger LOGGER =
    LoggerFactory.getLogger(RedisCacheConfig.class);
    @Bean
    public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate)
    {
    CacheManager cacheManager = new
    RedisCacheManager(redisTemplate);
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("Springboot Redis cacheManager 加载完成");
    }
    return cacheManager;
    }
    @Bean
    public RedisTemplate<String, Object>
    redisTemplate(RedisConnectionFactory factory) {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(factory);
    //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
    (默认使用JDK的序列化方式)
    Jackson2JsonRedisSerializer serializer = new
    Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper mapper = new ObjectMapper();
    mapper.setVisibility(PropertyAccessor.ALL,
    JsonAutoDetect.Visibility.ANY);
    mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    serializer.setObjectMapper(mapper);
    template.setValueSerializer(serializer);
    //使用StringRedisSerializer来序列化和反序列化redis的key值
    template.setKeySerializer(new StringRedisSerializer());
    template.afterPropertiesSet();
    LOGGER.info("Springboot RedisTemplate 加载完成");
    return template;
    }
    }
    注意 要使用 @Confifiguration 标注此类为一个配置类,当然你可以使用 @Component, 但是不推
    荐,原因在于 @Component 注解虽然也可以当作配置类,但是并不会为其生成CGLIB代理Class,而
    使用@Confifiguration,CGLIB会为其生成代理类,进行性能的提升。
     
    2.3 调用方application.propertie需要增加Redis配置
     
    我们的包开发完毕之后,调用方的application.properties需要进行相关配置如下: 
    #单机模式redis
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    spring.redis.pool.maxActive=8
    spring.redis.pool.maxWait=-1
    spring.redis.pool.maxIdle=8
    spring.redis.pool.minIdle=0
    spring.redis.timeout=10000
    spring.redis.password=
    如果有密码的话,配置password即可。
    这里为单机配置,如果需要支持哨兵集群,则配置如下,Java代码不需要改动,只需要变动配置即可。
    注意 两种配置不能共存! 
     
    #哨兵集群模式
    # database name
    spring.redis.database=0
    # server password 密码,如果没有设置可不配
    spring.redis.password=
    # pool settings ...池配置
    spring.redis.pool.max-idle=8
    spring.redis.pool.min-idle=0
    spring.redis.pool.max-active=8
    spring.redis.pool.max-wait=-1
    # name of Redis server 哨兵监听的Redis server的名称
    spring.redis.sentinel.master=mymaster
    # comma-separated list of host:port pairs 哨兵的配置列表
    spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579
    3. 定义注解
    为了调用方便,我们定义一个名为RateLimiter 的注解,内容如下 
    /**
    * @author snowalker
    * @version 1.0
    * @date 2018/10/27 1:25
    * @className RateLimiter
    * @desc 限流注解
    */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RateLimiter {
    /**
    * 限流key
    * @return
    */
    String key() default "rate:limiter";
    /**
    * 单位时间限制通过请求数
    * @return
    */long limit() default 10;
    /**
    * 过期时间,单位秒
    * @return
    */
    long expire() default 1;
    }
    该注解明确只用于方法,主要有三个属性。
    1. key--表示限流模块名,指定该值用于区分不同应用,不同场景,推荐格式为:应用名:模块名:ip:接
    口名:方法名
    2. limit--表示单位时间允许通过的请求数
    3. expire--incr的值的过期时间,业务中表示限流的单位时间。
    4. 解析注解
    定义好注解后,需要开发注解使用的切面,这里我们直接使用aspectj进行切面的开发。先看代码 
    @Aspect
    @Component
    public class RateLimterHandler {
    private static final Logger LOGGER =
    LoggerFactory.getLogger(RateLimterHandler.class);
    @Autowired
    RedisTemplate redisTemplate;
    private DefaultRedisScript<Long> getRedisScript;
    @PostConstruct
    public void init() {
    getRedisScript = new DefaultRedisScript<>();
    getRedisScript.setResultType(Long.class);
    getRedisScript.setScriptSource(new ResourceScriptSource(new
    ClassPathResource("rateLimter.lua")));
    LOGGER.info("RateLimterHandler[分布式限流处理器]脚本加载完成");
    }
    这里是注入了RedisTemplate,使用其API进行Lua脚本的调用。
    init() 方法在应用启动时会初始化DefaultRedisScript,并加载Lua脚本,方便进行调用。
    PS: Lua脚本放置在classpath下,通过ClassPathResource进行加载。 
    @Pointcut("@annotation(com.snowalker.shield.ratelimiter.core.annotation.RateL
    imiter)")
    public void rateLimiter() {}
    
    这里我们定义了一个切点,表示只要注解了 @RateLimiter 的方法,均可以触发限流操作。 
    1 @Around("@annotation(rateLimiter)")
    这段代码的逻辑为,获取 @RateLimiter 注解配置的属性:key、limit、expire,并通过
    redisTemplate.execute(RedisScript script, List keys, Object... args) 方法传递给Lua脚本进行限
    流相关操作,逻辑很清晰。
    这里我们定义如果脚本返回状态为0则为触发限流,1表示正常请求。
    5. Lua脚本
    public Object around(ProceedingJoinPoint proceedingJoinPoint,
    RateLimiter rateLimiter) throws Throwable {
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("RateLimterHandler[分布式限流处理器]开始执行限流操
    作");
    }
    Signature signature = proceedingJoinPoint.getSignature();
    if (!(signature instanceof MethodSignature)) {
    throw new IllegalArgumentException("the Annotation
    @RateLimter must used on method!");
    }
    /**
    * 获取注解参数
    */
    // 限流模块key
    String limitKey = rateLimiter.key();
    Preconditions.checkNotNull(limitKey);
    // 限流阈值
    long limitTimes = rateLimiter.limit();
    // 限流超时时间
    long expireTime = rateLimiter.expire();
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("RateLimterHandler[分布式限流处理器]参数值为-
    limitTimes={},limitTimeout={}", limitTimes, expireTime);
    }
    /**
    * 执行Lua脚本
    */
    List<String> keyList = new ArrayList();
    // 设置key值为注解中的值
    keyList.add(limitKey);
    /**
    * 调用脚本并执行
    */
    Long result = (Long) redisTemplate.execute(getRedisScript,
    keyList, expireTime, limitTimes);
    if (result == 0) {
    String msg = "由于超过单位时间=" + expireTime + "-允许的请求次数
    =" + limitTimes + "[触发限流]";
    LOGGER.debug(msg);
    return "false";
    }
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("RateLimterHandler[分布式限流处理器]限流执行结果-
    result={},请求[正常]响应", result);
    }
    return proceedingJoinPoint.proceed();
    }
    }
    这段代码的逻辑为,获取 @RateLimiter 注解配置的属性:key、limit、expire,并通过
    redisTemplate.execute(RedisScript script, List keys, Object... args) 方法传递给Lua脚本进行限
    流相关操作,逻辑很清晰。
    这里我们定义如果脚本返回状态为0则为触发限流,1表示正常请求。

    5. Lua脚本 

    这里是我们整个限流操作的核心,通过执行一个Lua脚本进行限流的操作。脚本内容如下 
    --获取KEY
    local key1 = KEYS[1]
    local val = redis.call('incr', key1)
    local ttl = redis.call('ttl', key1)
    --获取ARGV内的参数并打印
    local expire = ARGV[1]
    local times = ARGV[2]
    redis.log(redis.LOG_DEBUG,tostring(times))
    redis.log(redis.LOG_DEBUG,tostring(expire))
    redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val);
    if val == 1 then
    redis.call('expire', key1, tonumber(expire))
    else
    if ttl == -1 then
    redis.call('expire', key1, tonumber(expire))
    end
    end
    if val > tonumber(times) then
    return 0
    end
    return 1
    逻辑很通俗,我简单介绍下。
    1. 首先脚本获取Java代码中传递而来的要限流的模块的key,不同的模块key值一定不能相同,否则会
    覆盖!
    2. redis.call('incr', key1)对传入的key做incr操作,如果key首次生成,设置超时时间ARGV[1];(初
    始值为1)
    3. ttl是为防止某些key在未设置超时时间并长时间已经存在的情况下做的保护的判断;
    4. 每次请求都会做+1操作,当限流的值val大于我们注解的阈值,则返回0表示已经超过请求限制,
    触发限流。否则为正常请求。
    当过期后,又是新的一轮循环,整个过程是一个原子性的操作,能够保证单位时间不会超过我们预设的
    请求阈值。
    到这里我们便可以在项目中进行测试。

    测试

    demo地址
    这里我贴一下核心代码,我们定义一个接口,并注解 @RateLimiter(key = "ratedemo:1.0.0", limit
    = 5, expire = 100) 表示模块ratedemo:sendPayment:1.0.0 在100s内允许通过5个请求,这里的参数设
    置是为了方便看结果。实际中,我们通常会设置1s内允许通过的次数。 
     
    @Controller
    public class TestController {
    private static final Logger LOGGER =
    LoggerFactory.getLogger(TestController.class);
    @ResponseBody
    @RequestMapping("ratelimiter")
    @RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100)
    public String sendPayment(HttpServletRequest request) throws
    Exception {
    return "正常请求";
    }
    }
    我们通过RestClient请求接口,日志返回如下: 
     
    2018-10-28 00:00:00.602 DEBUG 17364 --- [nio-8888-exec-1]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:00.688 DEBUG 17364 --- [nio-8888-exec-1]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应
    2018-10-28 00:00:00.860 DEBUG 17364 --- [nio-8888-exec-3]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:01.183 DEBUG 17364 --- [nio-8888-exec-4]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:01.520 DEBUG 17364 --- [nio-8888-exec-3]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应
    2018-10-28 00:00:01.521 DEBUG 17364 --- [nio-8888-exec-4]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应
    2018-10-28 00:00:01.557 DEBUG 17364 --- [nio-8888-exec-5]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:01.558 DEBUG 17364 --- [nio-8888-exec-5]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应
    2018-10-28 00:00:01.774 DEBUG 17364 --- [nio-8888-exec-7]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:02.111 DEBUG 17364 --- [nio-8888-exec-8]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始
    2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-7]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应
    2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-8]
    c.s.s.r.core.handler.RateLimterHandler :
    由于超过单位时间=100-允许的请求次数=5[触发限流]
    
    2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100
    2018-10-28 00:00:02.278 DEBUG 17364 --- [io-8888-exec-10]
    c.s.s.r.core.handler.RateLimterHandler :
    由于超过单位时间=100-允许的请求次数=5[触发限流]
    2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100
    2018-10-28 00:00:02.446 DEBUG 17364 --- [nio-8888-exec-2]
    c.s.s.r.core.handler.RateLimterHandler :
    由于超过单位时间=100-允许的请求次数=5[触发限流]
    2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]开始执行限流操作
    2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4]
    c.s.s.r.core.handler.RateLimterHandler :
    RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100
    2018-10-28 00:00:02.629 DEBUG 17364 --- [nio-8888-exec-4]
    c.s.s.r.core.handler.RateLimterHandler :
    由于超过单位时间=100-允许的请求次数=5[触发限流]
    根据日志能够看到,正常请求5次后,返回限流触发,说明我们的逻辑生效,对前端而言也是可以看到
    false标记,表明我们的Lua脚本限流逻辑是正确的,这里具体返回什么标记需要调用方进行明确的定
    义。

    总结

    我们通过Redis的incr及expire功能特性,开发定义了一套基于注解的分布式限流操作,核心逻辑基于
    Lua保证了原子性。达到了很好的限流的目的,生产上,可以基于该特点进行定制自己的限流组件,当
    然你可以参考本文的代码,相信你写的一定比我的demo更好!
    在实际应用时也不要太纠结算法问题,因为一些限流算法实现是一样的只是描述不一样;具体使用哪种
    限流技术还是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫。
    因在实际工作中遇到过许多人来问如何进行限流,因此本文会详细介绍各种限流手段。那么接下来我们
    从限流算法、应用级限流、分布式限流、接入层限流来详细学习下限流技术手段。
    限流就好比保险丝,根据你制定的标准,达到了就拉闸。

    5、Hystrix或Gateway网关限流 

    6、Nginx限流 

    算法思想是:
    令牌以固定速率产生,并缓存到令牌桶中;
    令牌桶放满时,多余的令牌被丢弃;
    请求要消耗等比例的令牌才能被处理;
    令牌不够时,请求被缓存。

    漏桶算法

    算法思想是:
    水(请求)从上方倒入水桶,从水桶下方流出(被处理);
    来不及流出的水存在水桶中(缓冲),以固定速率流出;
    水桶满后水溢出(丢弃)。
    这个算法的核心是:缓存请求、匀速处理、多余的请求直接丢弃。
    相比漏桶算法,令牌桶算法不同之处在于它不但有一只“桶”,还有个队列,这个桶是用来存放令牌
    的,队列才是用来存放请求的。
    从作用上来说,漏桶和令牌桶算法最明显的区别就是是否允许突发流量(burst)的处理,漏桶算法能够强
    行限制数据的实时传输(处理)速率,对突发流量不做额外处理;而令牌桶算法能够在限制数据的平均
    传输速率的同时允许某种程度的突发传输。
    Nginx按请求速率限速模块使用的是漏桶算法,即能够强行保证请求的实时处理速度不会超过设置的阈
    值。
    Nginx官方版本限制IP的连接和并发分别有两个模块:
    limit_req_zone 用来限制单位时间内的请求数,即速率限制,采用的漏桶算法 "leaky bucket"。
    limit_req_conn 用来限制同一时间连接数,即并发限制。

    limit_req_zone 参数配置

    Syntax: limit_req zone=name [burst=number] [nodelay];
    Default: —
    Context: http, server, location
    limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
    第一个参数:$binary_remote_addr 表示通过remote_addr这个标识来做限制,“binary_”的目的
    是缩写内存占用量,是限制同一客户端ip地址。
    第二个参数:zone=one:10m表示生成一个大小为10M,名字为one的内存区域,用来存储访问的
    频次信息。
    第三个参数:rate=1r/s表示允许相同标识的客户端的访问频次,这里限制的是每秒1次,还可以有
    比如30r/m的。


     limit_req zone=one burst=5 nodelay;
    第一个参数:zone=one 设置使用哪个配置区域来做限制,与上面limit_req_zone 里的name对
    应。
    第二个参数:burst=5,重点说明一下这个配置,burst爆发的意思,这个配置的意思是设置一个
    大小为5的缓冲区当有大量请求(爆发)过来时,超过了访问频次限制的请求可以先放到这个缓冲
    区内。
    第三个参数:nodelay,如果设置,超过访问频次而且缓冲区也满了的时候就会直接返回503,如
    果没有设置,则所有请求会等待排队。

    例子: 

    http {
    limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
    server {
    location /search/ {
    limit_req zone=one burst=5 nodelay;
    }
    }
    下面配置可以限制特定UA(比如搜索引擎)的访问: 
    limit_req_zone $anti_spider zone=one:10m rate=10r/s;
    limit_req zone=one burst=100 nodelay;
    if ($http_user_agent ~* "googlebot|bingbot|Feedfetcher-Google") {
    set $anti_spider $http_user_agent;
    }

     
    其他参数

    Syntax: limit_req_log_level info | notice | warn | error; Default: limit_req_log_level error; Context: http, server, location
    当服务器由于limit被限速或缓存时,配置写入日志。延迟的记录比拒绝的记录低一个级别。例子:
    limit_req_log_level notice 延迟的的基本是info。 
     
    Syntax: limit_req_status code;
    Default:
    limit_req_status 503;
    Context: http, server, location
    设置拒绝请求的返回值。值只能设置 400 到 599 之间。

    ngx_http_limit_conn_module 参数配置

    这个模块用来限制单个IP的请求数。并非所有的连接都被计数。只有在服务器处理了请求并且已经读取
    了整个请求头时,连接才被计数。 
     
    Syntax: limit_conn zone number;
    Default: —
    Context: http, server, location
    limit_conn_zone $binary_remote_addr zone=addr:10m;
    server {
    location /download/ {
    limit_conn addr 1;
    }
    一次只允许每个IP地址一个连接。 
    limit_conn_zone $binary_remote_addr zone=perip:10m;
    limit_conn_zone $server_name zone=perserver:10m;
    server {
    ...
    limit_conn perip 10;
    limit_conn perserver 100;
    }
    可以配置多个limit_conn指令。例如,以上配置将限制每个客户端IP连接到服务器的数量,同时限制连
    接到虚拟服务器的总数。 
    Syntax: limit_conn_zone key zone=name:size;
    Default: —
    Context: http
    limit_conn_zone $binary_remote_addr zone=addr:10m;
    在这里,客户端IP地址作为关键。请注意,不是 $ remote_addr ,而是使用 $ binary_remote_addr
    变量。 $ remote_addr 变量的大小可以从7到15个字节不等。存储的状态在32位平台上占用32或64字
    节的内存,在64位平台上总是占用64字节。对于IPv4地址, $ binary_remote_addr 变量的大小始终
    为4个字节,对于IPv6地址则为16个字节。存储状态在32位平台上始终占用32或64个字节,在64位平台
    上占用64个字节。一个兆字节的区域可以保持大约32000个32字节的状态或大约16000个64字节的状
    态。如果区域存储耗尽,服务器会将错误返回给所有其他请求。 
     
    Syntax: limit_conn_log_level info | notice | warn | error;
    Default:
    limit_conn_log_level error;
    Context: http, server, location
    当服务器限制连接数时,设置所需的日志记录级别。
    Syntax: limit_conn_status code;
    Default:
    limit_conn_status 503;
    Context: http, server, location
    

      

    设置拒绝请求的返回值。

     

    实战

    实例一 限制访问速率 
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    server {
    location / {
    limit_req zone=mylimit;
    }
    }
    上述规则限制了每个IP访问的速度为2r/s,并将该规则作用于根目录。如果单个IP在非常短的时间内并
    发发送多个请求,结果会怎样呢?
    我们使用单个IP在10ms内发并发送了6个请求,只有1个成功,剩下的5个都被拒绝。我们设置的速度是
    2r/s,为什么只有1个成功呢,是不是Nginx限制错了?当然不是,是因为Nginx的限流统计是基于毫秒
    的,我们设置的速度是2r/s,转换一下就是500ms内单个IP只允许通过1个请求,从501ms开始才允许
    通过第二个请求。

    实例二 burst缓存处理

    我们看到,我们短时间内发送了大量请求,Nginx按照毫秒级精度统计,超出限制的请求直接拒绝。这
    在实际场景中未免过于苛刻,真实网络环境中请求到来不是匀速的,很可能有请求“突发”的情况,也就
    是“一股子一股子”的。Nginx考虑到了这种情况,可以通过burst关键字开启对突发请求的缓存处理,而
    不是直接拒绝。
    来看我们的配置: 
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    server {
    location / {
    limit_req zone=mylimit burst=4;
    }
    }
    我们加入了burst=4,意思是每个key(此处是每个IP)最多允许4个突发请求的到来。如果单个IP在10ms
    内发送6个请求,结果会怎样呢?
    相比实例一成功数增加了4个,这个我们设置的burst数目是一致的。具体处理流程是:1个请求被立即
    处理,4个请求被放到burst队列里,另外一个请求被拒绝。通过burst参数,我们使得Nginx限流具备了
    缓存处理突发流量的能力。
    但是请注意:burst的作用是让多余的请求可以先放到队列里,慢慢处理。如果不加nodelay参数,队列
    里的请求不会立即处理,而是按照rate设置的速度,以毫秒级精确的速度慢慢处理。

    实例三 nodelay降低排队时间

    实例二中我们看到,通过设置burst参数,我们可以允许Nginx缓存处理一定程度的突发,多余的请求可
    以先放到队列里,慢慢处理,这起到了平滑流量的作用。但是如果队列设置的比较大,请求排队的时间
    就会比较长,用户角度看来就是RT变长了,这对用户很不友好。有什么解决办法呢?nodelay参数允许
    请求在排队的时候就立即被处理,也就是说只要请求能够进入burst队列,就会立即被后台worker处
    理,请注意,这意味着burst设置了nodelay时,系统瞬间的QPS可能会超过rate设置的阈值。nodelay
    参数要跟burst一起使用才有作用。
     
    延续实例二的配置,我们加入nodelay选项: 
     
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    server {
    location / {
    limit_req zone=mylimit;
    }
    }
    
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    server {
    location / {
    limit_req zone=mylimit burst=4;
    }
    }
    单个IP 10ms内并发发送6个请求,结果如下: 
    跟实例二相比,请求成功率没变化,但是总体耗时变短了。这怎么解释呢?实例二中,有4个请求被放
    到burst队列当中,工作进程每隔500ms(rate=2r/s)取一个请求进行处理,最后一个请求要排队2s才会
    被处理;实例三中,请求放入队列跟实例二是一样的,但不同的是,队列中的请求同时具有了被处理的
    资格,所以实例三中的5个请求可以说是同时开始被处理的,花费时间自然变短了。
    但是请注意,虽然设置burst和nodelay能够降低突发请求的处理时间,但是长期来看并不会提高吞吐量
    的上限,长期吞吐量的上限是由rate决定的,因为nodelay只能保证burst的请求被立即处理,但Nginx
    会限制队列元素释放的速度,就像是限制了令牌桶中令牌产生的速度。
    看到这里你可能会问,加入了nodelay参数之后的限速算法,到底算是哪一个“桶”,是漏桶算法还是令
    牌桶算法?当然还算是漏桶算法。考虑一种情况,令牌桶算法的token为耗尽时会怎么做呢?由于它有
    一个请求队列,所以会把接下来的请求缓存下来,缓存多少受限于队列大小。但此时缓存这些请求还有
    意义吗?如果server已经过载,缓存队列越来越长,RT越来越高,即使过了很久请求被处理了,对用户
    来说也没什么价值了。所以当token不够用时,最明智的做法就是直接拒绝用户的请求,这就成了漏桶
    算法。
     
    示例四 自定义返回值 
     
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    server {
    location / {
    limit_req zone=mylimit burst=4 nodelay;
    limit_req_status 598;
    }
    }

  • 相关阅读:
    苹果一体机发射Wi-Fi
    iphone 屏蔽系统自动更新,消除设置上的小红点
    data parameter is nil 异常处理
    copy与mutableCopy的区别总结
    java axis2 webservice
    mysql 远程 ip访问
    mysql 存储过程小问题
    mysql游标错误
    is not writable or has an invalid setter method错误的解决
    Struts2中关于"There is no Action mapped for namespace / and action name"的总结
  • 原文地址:https://www.cnblogs.com/icodingedu/p/12657374.html
Copyright © 2011-2022 走看看