zoukankan      html  css  js  c++  java
  • 系统限流(分布式)

    系统限流(分布式)

    其他开源框架限流方案:(gateway、zuul都集成以下一种或底层算法基于其中)

    • Guava->RateLimiter

    支持:平滑突发限流、平滑预热限流

    // 平滑突发限流器,RateLimiter.create(5) 表示这个限流器容量为 5,并且每秒生成 5 个令牌,也就是每隔 200 毫秒生成一个
    RateLimiter limiter = RateLimiter.create(5);
    
    //平滑预热限流器,第一个参数还是每秒创建的令牌数量,这里是每秒 2 个,也就是每 500 毫秒生成一个,后面的参数表示从冷启动速率过渡到平均速率的时间间隔,也就是所谓的热身时间间隔(warm up period)
    RateLimiter limiter = RateLimiter.create(2, 3, TimeUnit.SECONDS);
    
    • Bucket4j
    // Bucket接口代表了令牌桶的具体实现
    // Bandwidth 的意思是带宽,可以理解为限流的规则(simple 和 classic)
    Bucket bucket = Bucket4j.builder().addLimit(limit).build();
    // simple 方式创建的 Bandwidth,表示桶大小为 10,填充速度为每分钟 10 个令牌
    Bandwidth limit = Bandwidth.simple(10, Duration.ofMinutes(1));
    // 桶大小为 10,填充速度为每分钟 5 个令牌
    Refill filler = Refill.greedy(5, Duration.ofMinutes(1));
    Bandwidth limit = Bandwidth.classic(10, filler);
    // Refill 用于填充令牌桶,可以通过它定义填充速度,Bucket4j 有两种填充令牌的策略:间隔策略(intervally) 和 贪婪策略(greedy).在上面的例子中我们使用的是贪婪策略,如果使用间隔策略可以像下面这样创建 Refill:
    Refill filler = Refill.intervally(5, Duration.ofMinutes(1));
    // 间隔指的是一次性全部放入,贪婪指的是根据时间单元一次一个
    

    缺点:Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用Redis

    1. Resilience4j

    Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiterAtomicRateLimiterSemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。

    Resilience4j 也提供了两种隔离的实现:SemaphoreBulkheadThreadPoolBulkhead,通过信号量或线程池控制请求的并发数,

    缺点:不支持分布式限流

    方案一:gateway配置方式

    1. 引入所需依赖
        <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-gateway</artifactId>
       </dependency>
       
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifatId>spring-boot-starter-data-redis-reactive</artifactId>
       </dependency>
    
    1. 添加限流策略配置类
    /**
     * 限流策略配置类
     *
     * @author Andrew
     * @date 2021/6/3
     */
    @Configuration
    public class CurrentLimitingResolver {
        
        /**
         * 基于IP地址进行限流
         * @return KeyResolver
         */
        @Bean("remoteAddrKeyResolver")
        @Primary
        public KeyResolver remoteAddrKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
        }
    
        /**
         * 基于hostName地址进行限流
         * @return KeyResolver
         */
        @Bean("remoteHostNameKeyResolver")
        public KeyResolver remoteHostNameKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostName());
        }
    
        /**
         * 根据请求参数进行限流
         * @return KeyResolver
         */
        @Bean("remoteUserKeyResolver")
        public KeyResolver remoteUserKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getQueryParams().getFirst("user")));
        }
    
        /**
         * 接口限流
         * @return KeyResolver
         */
        @Bean("remoteApiKeyResolver")
        public KeyResolver apiKeyResolver() {
            return exchange -> Mono.just(exchange.getRequest().getPath().value());
        }
    }
    
    1. yml文件配置所需参数
    # 除了gateway相关配置还需加上redis配置信息,因开启限流后会自动生成两个kv
    spring:
      application:
        name: cloud-gateway
      cloud:
        gateway:
          discovery:
            locator:
              enabled: true # 开启从注册中心动态创建路由的功能,利用微服务名进行路由
          routes:
            - id: payment_routh # 路由的ID,没有固定规则但要求唯一,建议配合服务名
              uri: lb:// cloud-payment-service #微服务名
               filters:
                 - name: RequestRateLimiter # RequestRateLimiter的限流过滤器,基于令牌桶算法
                   args:
                     redis-rate-limiter.replenishRate: 10 # 令牌桶每秒填充平均速率
                     redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
                     key-resolver: "#{@remoteAddrKeyResolver}" # 限流策略,spEl语法
              predicates:
                - Path=/payment/get   # 断言,路径相匹配的进行路由
    

    方案二:Zuul配置方式(基于guava)

    1. 导入依赖
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-starter-netflix-zuul</artifactId>
        </dependency>
        <dependency>
          <groupId>com.marcosbarbero.cloud</groupId>
          <artifactId>spring-cloud-zuul-ratelimit</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-pool2</artifactId>
          <version>2.5.0</version>
        </dependency>
    
    1. 主启动类加入注解@EnableZuulProxy

    2. 配置文件

    // 若开启微服务名称即可不用配置routes自动会加入path
    zuul:
      ratelimit:
        enabled: true // 开启路由
        repository: REDIS // 存储位置redis,也可更换配置用CONSUL、REDIS、JPA、IN_MEMORY
        default-policy: // 全局限流策略
          limit: 1
          quota: 1
          refresh-interval: 3
        policies: // 个别微服务限流策略
          product-service:
            limit: 5
            quota: 10
            refresh-interval: 60
            type: url
      routes: // 路由
        product-service:
          path: /product-service/** // 根据path寻找下面的url微服务节点
          url: http://localhost:7070/
        order-service:
          path: /order-service/**
          url: http://localhost:9090/
    

    令牌桶算法执行逻辑:

    1. 取出当前时间
    2. 当前时间大于上次放入令牌时间成立继续往下执行
    3. 距离上次时间已过去多久?
    4. 时间*生产速度=剩余令牌
    5. 剩余令牌与桶容量比较取出最小值作为剩余令牌
    6. 重新对时间进行赋值

    方案三:基于redis与lua脚本令牌桶算法实现

    1. 导入redis依赖
    <!-- Spring Boot Redis 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--spring2.0 集成 redis 所需common-pool2-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
    
    1. 自定义限流yml文件(application-ratelimit.yml)
    # 自定义限流参数
    ratelimit:
      # 限流全局开关
      enabled: true
      # 普通请求相关
      ordinary:
        # 限流次数
        limit: 5
        # 限流单位时间
        refresh-interval: 60
      # 升级请求相关
      upgrade:
        limit: 3
        refresh-interval: 60
        # 具体msgType TODO 需要确认是否是以下接口
        url:
          - 23710
          - 23711
          - 24334
          - 24335
          - 24336
          - 24337
          - 24338
          - 24339
          - 24340
          - 24341
          - 24348
          - 24349
          - 24350
    
    1. 编写lua脚本(算法)
    -- 切换到命令复制模式
    redis.replicate_commands();
    -- 参数中传递的令牌key,基于选定的限流策略来定(唯一)
    local key = KEYS[1]
    -- 令牌桶填充 限流单位时间
    local update_len = tonumber(ARGV[1])
    -- 记录 第一次访问的时间戳
    local key_time = key..'_FRT'
    -- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后毫秒数),由于我是按秒计算的,这里只要curr_time_arr[1](注意:redis数组下标是从1开始的)
    -- 如果需要获得毫秒数 则为 tonumber(arr[1]*1000 + arr[2])
    local curr_time_arr = redis.call('TIME')
    -- 当前时间秒数
    local nowTime = tonumber(curr_time_arr[1])
    -- 从redis中获取当前key 第一次访问的时间戳,无直接赋值0,有即value
    local curr_key_time = tonumber(redis.call('get', key_time) or 0)
    -- 获取当前key对应令牌桶中的令牌数,无直接赋值-1,有即value
    local token_count = tonumber(redis.call('get', key) or -1)
    -- 当前令牌桶的容量,用户自定义初始化大小
    local token_size = tonumber(ARGV[2])
    -- 令牌桶数量小于0 说明令牌桶没有初始化
    if token_count < 0 then
       redis.call('set', key_time, nowTime)
       redis.call('set', key, token_size -1)
        redis.call('expire', key_time, update_len) -- 从第一次访问开始限流,设置对应键值的过期
        redis.call('expire', key, update_len)
       return token_size -1
    else
       if token_count > 0 then -- 当前令牌桶中令牌数够用
          redis.call('decr', key)
          return token_count -1   -- 返回剩余令牌数
       else    -- 当前令牌桶中令牌数已清空
          return -1
       end
    end
    
    1. 自定义全局异常抛出

    先继承RunTimeException类创建一个异常类

    package com.intretech.smart.mqtt.msgserver.config;
    
    
    import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;
    
    /**
     * 自定义运行异常
     *
     * @author Andrew
     * @date 2021/6/22
     */
    public class RateLimiterException extends RuntimeException {
        /**
         * 状态码
         */
        private final String code;
    
        public RateLimiterException(RateLimiterEnum rateLimiterEnum) {
            super(rateLimiterEnum.getMessage());
            this.code = rateLimiterEnum.getCode();
        }
    
        public String getCode() {
            return code;
        }
    }
    

    @ControllerAdvice ,很多初学者可能都没有听说过这个注解,实际上,这是一个非常有用的注解,顾名思义,这是一个增强的 Controller。使用这个 Controller ,可以实现三个方面的功能:

    1. 全局异常处理
    2. 全局数据绑定
    3. 全局数据预处理
    package com.intretech.smart.mqtt.msgserver.config;
    
    import com.intretech.smart.mqtt.msgserver.domian.ExceptionResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.http.HttpStatus;
    import org.springframework.util.StringUtils;
    import org.springframework.web.bind.annotation.ControllerAdvice;
    import org.springframework.web.bind.annotation.ExceptionHandler;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    /**
     * 全局自定义异常消息
     *
     * <p>根据用户自定义抛出运行时异常返回客户端,在无自定义情况下默认返回500.
     *
     * @author Andrew
     * @date 2021/6/22
     */
    @ControllerAdvice
    public class GlobalRateLimiterExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(GlobalRateLimiterExceptionHandler.class);
    
        @ExceptionHandler(RateLimiterException.class)
        @ResponseBody
        public ExceptionResult handleCustomRateLimiterException(RateLimiterException e) {
            if (StringUtils.isEmpty(e)) {
                logger.error("GlobalExceptionHandler: handleCustomRateLimiterException(RateLimiterException e) parameter RateLimiterException is null");
            }
    
            // 生成自定义返回异常类
            ExceptionResult result = new ExceptionResult();
            // 在无自定义情况下500
            if (StringUtils.isEmpty(e.getCode())) {
                result.setCode(HttpStatus.INTERNAL_SERVER_ERROR.toString());
            }
            if (StringUtils.isEmpty(e.getMessage())) {
                result.setMsg(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase());
            }
    
            result.setCode(e.getCode());
            result.setMsg(e.getMessage());
            return result;
        }
    }
    
    1. 异常返回封装类
    package com.intretech.smart.mqtt.msgserver.domian;
    
    import lombok.Getter;
    import lombok.Setter;
    
    import java.io.Serializable;
    
    /**
     * 自定义异常返回类
     *
     * @author Andrew
     * @date 2021/6/22
     */
    @Getter
    @Setter
    public class ExceptionResult implements Serializable {
        /**
         * 异常码
         */
        private String code;
        /**
         * 异常消息描述
         */
        private String msg;
    }
    
    1. 利用责任链模式组装两个限流策略

    抽象父类

    package com.intretech.smart.mqtt.msgserver.responsibility;
    
    import lombok.Getter;
    import lombok.Setter;
    
    /**
     * 限流抽象父类
     *
     * <p>利用责任链模式组装需要执行限流的策略.
     *
     * @author Andrew
     * @date 2021/6/21
     */
    @Getter
    @Setter
    public abstract class AbstractRateLimiterHandler {
        /**
         * 限流抽象父类
         */
        private AbstractRateLimiterHandler nextAbstractRateLimiterHandler;
    
        /**
         * 处理请求限流方法
         *
         * @param sessionId 会话key
         * @param msgType   消息类型
         */
        public abstract void judgeMsgType(String sessionId, String msgType);
    }
    

    具体实现类

    package com.intretech.smart.mqtt.msgserver.responsibility;
    
    import com.google.common.collect.ImmutableList;
    import com.intretech.smart.mqtt.msgserver.config.RateLimiterException;
    import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;
    import com.intretech.smart.mqtt.msgserver.domian.RateLimiter;
    import com.intretech.smart.mqtt.msgserver.service.RateLimiterService;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.core.script.DefaultRedisScript;
    
    /**
     * 升级请求接口自定义限流策略类
     *
     * <p>继承自AbstractRateLimiterHandler,实现责任链模式.
     * 对升级功能的接口全部列出,匹配则执行限流方法,无匹配则转发至下一个责任链.
     *
     * @author Andrew
     * @date 2021/6/21
     */
    @Configuration
    public class UpGradeLimiterHandler extends AbstractRateLimiterHandler {
        private static final Logger logger = LoggerFactory.getLogger(UpGradeLimiterHandler.class);
    
        @Autowired
        private StringRedisTemplate limitRedisTemplate;
    
        @Autowired
        private DefaultRedisScript<Number> redisScript;
    
        @Autowired
        private RateLimiter rateLimiter;
    
        @Autowired
        private RateLimiterService rateLimiterService;
    
        /**
         * 生成限流唯一key
         *
         * @return 限流唯一key
         */
        private String makeRateLimiterKey(String clientId, String url) {
            return RateLimiterCommon.UPGRADE_KEY_PREFIX + url + clientId;
        }
    
        @Override
        public void judgeMsgType(String sessionId, String msgType) {
            if (rateLimiter.getUpGradeLimiter().getUrl().contains(msgType)) {
                performRateLimiter(sessionId, msgType);
            } else {
                if (getNextAbstractRateLimiterHandler() != null) {
                    getNextAbstractRateLimiterHandler().judgeMsgType(sessionId, msgType);
                } else {
                    logger.warn("OrdinaryLimiterHandler: No matching executable current limiting policy sessionId:{}, msgType:{}", sessionId, msgType);
                }
            }
        }
    
        /**
         * 限流执行具体方法
         *
         * @param sessionId 会话token
         * @param msgType   消息类型
         */
        private void performRateLimiter(String sessionId, String msgType) {
            // 获得唯一客户端标识
            String clientId = rateLimiterService.judgeClient(sessionId);
            // 取出消息体中唯一客户端标识
            if (StringUtils.isEmpty(clientId)) {
                logger.info("OrdinaryLimiterHandler: rateLimiterService.judgeClient(String accessToken) return null, sessionId:{}", sessionId);
                return;
            }
    
            // 生成唯一限流key
            String limitKey = makeRateLimiterKey(clientId, msgType);
            // 不可变、线程安全的列表集合,只会获取传入对象的一个副本
            ImmutableList<String> keys = ImmutableList.of(limitKey);
            Number count = limitRedisTemplate.execute(redisScript, keys, rateLimiter.getUpGradeLimiter().getRefreshInterval(), rateLimiter.getUpGradeLimiter().getLimit());
            // 判断redis是否返回-1,-1代表无令牌,其他正数则是剩余令牌数
            if (count == null || count.intValue() == -1) {
                logger.info("OrdinaryLimiterHandler:clientId:{} access more than limit:{} times in freshInterval:{}", clientId, rateLimiter.getUpGradeLimiter().getLimit(), rateLimiter.getUpGradeLimiter().getRefreshInterval());
                throw new RateLimiterException(RateLimiterEnum.TOO_MANY_REQUESTS);
            }
        }
    }
    

    总结:

    1、分布式限流方案优选应当以redis作为缓存中间件配合lua脚本实现令牌桶算法,redis存储两个关键值:1.客户端第一次访问时间 2.当前桶中剩余令牌数、

    2、基于restFull风格的api在每次变更后会认定为是一个全心的url不受限流作用,如果以带参形式访问即使换了参数也被认为是同一个url受限流作用。

       // 如果换掉ID即认为是新的url,比如localhost:8080/1 与localhost:8080/2虽访问同一个接口但是被认定为不同访问者
        @GetMapping("/{id}")
        public String selectOrderById(@PathVariable("id") Integer id) {
            return orderService.selectOrderById(id);
        }
      // 参数不同还是被认为是同一访问者,比如localhost:8080/single?id=1 与localhost:8080/single?id=2都被限流
        @GetMapping("/single")
        public String updateOrderById(Integer id) {
            return orderService.updateOrderById(id);
        }
    
  • 相关阅读:
    1062 Talent and Virtue (25 分)
    1083 List Grades (25 分)
    1149 Dangerous Goods Packaging (25 分)
    1121 Damn Single (25 分)
    1120 Friend Numbers (20 分)
    1084 Broken Keyboard (20 分)
    1092 To Buy or Not to Buy (20 分)
    数组与链表
    二叉树
    时间复杂度与空间复杂度
  • 原文地址:https://www.cnblogs.com/zbm-gg/p/14933441.html
Copyright © 2011-2022 走看看