zoukankan      html  css  js  c++  java
  • 系统限流(分布式)

    系统限流(分布式)

    其他开源框架限流方案:(gateway、zuul都集成以下一种或底层算法基于其中)

    • Guava->RateLimiter

    支持:平滑突发限流、平滑预热限流

    // 平滑突发限流器,RateLimiter.create(5) 表示这个限流器容量为 5,并且每秒生成 5 个令牌,也就是每隔 200 毫秒生成一个
    RateLimiter limiter = RateLimiter.create(5);
    
    //平滑预热限流器,第一个参数还是每秒创建的令牌数量,这里是每秒 2 个,也就是每 500 毫秒生成一个,后面的参数表示从冷启动速率过渡到平均速率的时间间隔,也就是所谓的热身时间间隔(warm up period)
    RateLimiter limiter = RateLimiter.create(2, 3, TimeUnit.SECONDS);
    
    • Bucket4j
    // Bucket接口代表了令牌桶的具体实现
    // Bandwidth 的意思是带宽,可以理解为限流的规则(simple 和 classic)
    Bucket bucket = Bucket4j.builder().addLimit(limit).build();
    // simple 方式创建的 Bandwidth,表示桶大小为 10,填充速度为每分钟 10 个令牌
    Bandwidth limit = Bandwidth.simple(10, Duration.ofMinutes(1));
    // 桶大小为 10,填充速度为每分钟 5 个令牌
    Refill filler = Refill.greedy(5, Duration.ofMinutes(1));
    Bandwidth limit = Bandwidth.classic(10, filler);
    // Refill 用于填充令牌桶,可以通过它定义填充速度,Bucket4j 有两种填充令牌的策略:间隔策略(intervally) 和 贪婪策略(greedy).在上面的例子中我们使用的是贪婪策略,如果使用间隔策略可以像下面这样创建 Refill:
    Refill filler = Refill.intervally(5, Duration.ofMinutes(1));
    // 间隔指的是一次性全部放入,贪婪指的是根据时间单元一次一个
    

    缺点:Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用Redis

    1. Resilience4j

    Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiterAtomicRateLimiterSemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。

    Resilience4j 也提供了两种隔离的实现:SemaphoreBulkheadThreadPoolBulkhead,通过信号量或线程池控制请求的并发数,

    缺点:不支持分布式限流

    方案一:gateway配置方式

    1. 引入所需依赖
        <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-gateway</artifactId>
       </dependency>
       
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifatId>spring-boot-starter-data-redis-reactive</artifactId>
       </dependency>
    
    1. 添加限流策略配置类
    /**
     * 限流策略配置类
     *
     * @author Andrew
     * @date 2021/6/3
     */
    @Configuration
    public class CurrentLimitingResolver {
        
        /**
         * 基于IP地址进行限流
         * @return KeyResolver
         */
        @Bean("remoteAddrKeyResolver")
        @Primary
        public KeyResolver remoteAddrKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
        }
    
        /**
         * 基于hostName地址进行限流
         * @return KeyResolver
         */
        @Bean("remoteHostNameKeyResolver")
        public KeyResolver remoteHostNameKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostName());
        }
    
        /**
         * 根据请求参数进行限流
         * @return KeyResolver
         */
        @Bean("remoteUserKeyResolver")
        public KeyResolver remoteUserKeyResolver() {
            return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getQueryParams().getFirst("user")));
        }
    
        /**
         * 接口限流
         * @return KeyResolver
         */
        @Bean("remoteApiKeyResolver")
        public KeyResolver apiKeyResolver() {
            return exchange -> Mono.just(exchange.getRequest().getPath().value());
        }
    }
    
    1. yml文件配置所需参数
    # 除了gateway相关配置还需加上redis配置信息,因开启限流后会自动生成两个kv
    spring:
      application:
        name: cloud-gateway
      cloud:
        gateway:
          discovery:
            locator:
              enabled: true # 开启从注册中心动态创建路由的功能,利用微服务名进行路由
          routes:
            - id: payment_routh # 路由的ID,没有固定规则但要求唯一,建议配合服务名
              uri: lb:// cloud-payment-service #微服务名
               filters:
                 - name: RequestRateLimiter # RequestRateLimiter的限流过滤器,基于令牌桶算法
                   args:
                     redis-rate-limiter.replenishRate: 10 # 令牌桶每秒填充平均速率
                     redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
                     key-resolver: "#{@remoteAddrKeyResolver}" # 限流策略,spEl语法
              predicates:
                - Path=/payment/get   # 断言,路径相匹配的进行路由
    

    方案二:Zuul配置方式(基于guava)

    1. 导入依赖
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-starter-netflix-zuul</artifactId>
        </dependency>
        <dependency>
          <groupId>com.marcosbarbero.cloud</groupId>
          <artifactId>spring-cloud-zuul-ratelimit</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-pool2</artifactId>
          <version>2.5.0</version>
        </dependency>
    
    1. 主启动类加入注解@EnableZuulProxy

    2. 配置文件

    // 若开启微服务名称即可不用配置routes自动会加入path
    zuul:
      ratelimit:
        enabled: true // 开启路由
        repository: REDIS // 存储位置redis,也可更换配置用CONSUL、REDIS、JPA、IN_MEMORY
        default-policy: // 全局限流策略
          limit: 1
          quota: 1
          refresh-interval: 3
        policies: // 个别微服务限流策略
          product-service:
            limit: 5
            quota: 10
            refresh-interval: 60
            type: url
      routes: // 路由
        product-service:
          path: /product-service/** // 根据path寻找下面的url微服务节点
          url: http://localhost:7070/
        order-service:
          path: /order-service/**
          url: http://localhost:9090/
    

    令牌桶算法执行逻辑:

    1. 取出当前时间
    2. 当前时间大于上次放入令牌时间成立继续往下执行
    3. 距离上次时间已过去多久?
    4. 时间*生产速度=剩余令牌
    5. 剩余令牌与桶容量比较取出最小值作为剩余令牌
    6. 重新对时间进行赋值

    方案三:基于redis与lua脚本令牌桶算法实现

    1. 导入redis依赖
    <!-- Spring Boot Redis 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--spring2.0 集成 redis 所需common-pool2-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
    
    1. 自定义限流yml文件(application-ratelimit.yml)
    # 自定义限流参数
    ratelimit:
      # 限流全局开关
      enabled: true
      # 普通请求相关
      ordinary:
        # 限流次数
        limit: 5
        # 限流单位时间
        refresh-interval: 60
      # 升级请求相关
      upgrade:
        limit: 3
        refresh-interval: 60
        # 具体msgType TODO 需要确认是否是以下接口
        url:
          - 23710
          - 23711
          - 24334
          - 24335
          - 24336
          - 24337
          - 24338
          - 24339
          - 24340
          - 24341
          - 24348
          - 24349
          - 24350
    
    1. 编写lua脚本(算法)
    -- 切换到命令复制模式
    redis.replicate_commands();
    -- 参数中传递的令牌key,基于选定的限流策略来定(唯一)
    local key = KEYS[1]
    -- 令牌桶填充 限流单位时间
    local update_len = tonumber(ARGV[1])
    -- 记录 第一次访问的时间戳
    local key_time = key..'_FRT'
    -- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后毫秒数),由于我是按秒计算的,这里只要curr_time_arr[1](注意:redis数组下标是从1开始的)
    -- 如果需要获得毫秒数 则为 tonumber(arr[1]*1000 + arr[2])
    local curr_time_arr = redis.call('TIME')
    -- 当前时间秒数
    local nowTime = tonumber(curr_time_arr[1])
    -- 从redis中获取当前key 第一次访问的时间戳,无直接赋值0,有即value
    local curr_key_time = tonumber(redis.call('get', key_time) or 0)
    -- 获取当前key对应令牌桶中的令牌数,无直接赋值-1,有即value
    local token_count = tonumber(redis.call('get', key) or -1)
    -- 当前令牌桶的容量,用户自定义初始化大小
    local token_size = tonumber(ARGV[2])
    -- 令牌桶数量小于0 说明令牌桶没有初始化
    if token_count < 0 then
       redis.call('set', key_time, nowTime)
       redis.call('set', key, token_size -1)
        redis.call('expire', key_time, update_len) -- 从第一次访问开始限流,设置对应键值的过期
        redis.call('expire', key, update_len)
       return token_size -1
    else
       if token_count > 0 then -- 当前令牌桶中令牌数够用
          redis.call('decr', key)
          return token_count -1   -- 返回剩余令牌数
       else    -- 当前令牌桶中令牌数已清空
          return -1
       end
    end
    
    1. 自定义全局异常抛出

    先继承RunTimeException类创建一个异常类

    package com.intretech.smart.mqtt.msgserver.config;
    
    
    import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;
    
    /**
     * 自定义运行异常
     *
     * @author Andrew
     * @date 2021/6/22
     */
    public class RateLimiterException extends RuntimeException {
        /**
         * 状态码
         */
        private final String code;
    
        public RateLimiterException(RateLimiterEnum rateLimiterEnum) {
            super(rateLimiterEnum.getMessage());
            this.code = rateLimiterEnum.getCode();
        }
    
        public String getCode() {
            return code;
        }
    }
    

    @ControllerAdvice ,很多初学者可能都没有听说过这个注解,实际上,这是一个非常有用的注解,顾名思义,这是一个增强的 Controller。使用这个 Controller ,可以实现三个方面的功能:

    1. 全局异常处理
    2. 全局数据绑定
    3. 全局数据预处理
    package com.intretech.smart.mqtt.msgserver.config;
    
    import com.intretech.smart.mqtt.msgserver.domian.ExceptionResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.http.HttpStatus;
    import org.springframework.util.StringUtils;
    import org.springframework.web.bind.annotation.ControllerAdvice;
    import org.springframework.web.bind.annotation.ExceptionHandler;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    /**
     * 全局自定义异常消息
     *
     * <p>根据用户自定义抛出运行时异常返回客户端,在无自定义情况下默认返回500.
     *
     * @author Andrew
     * @date 2021/6/22
     */
    @ControllerAdvice
    public class GlobalRateLimiterExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(GlobalRateLimiterExceptionHandler.class);
    
        @ExceptionHandler(RateLimiterException.class)
        @ResponseBody
        public ExceptionResult handleCustomRateLimiterException(RateLimiterException e) {
            if (StringUtils.isEmpty(e)) {
                logger.error("GlobalExceptionHandler: handleCustomRateLimiterException(RateLimiterException e) parameter RateLimiterException is null");
            }
    
            // 生成自定义返回异常类
            ExceptionResult result = new ExceptionResult();
            // 在无自定义情况下500
            if (StringUtils.isEmpty(e.getCode())) {
                result.setCode(HttpStatus.INTERNAL_SERVER_ERROR.toString());
            }
            if (StringUtils.isEmpty(e.getMessage())) {
                result.setMsg(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase());
            }
    
            result.setCode(e.getCode());
            result.setMsg(e.getMessage());
            return result;
        }
    }
    
    1. 异常返回封装类
    package com.intretech.smart.mqtt.msgserver.domian;
    
    import lombok.Getter;
    import lombok.Setter;
    
    import java.io.Serializable;
    
    /**
     * 自定义异常返回类
     *
     * @author Andrew
     * @date 2021/6/22
     */
    @Getter
    @Setter
    public class ExceptionResult implements Serializable {
        /**
         * 异常码
         */
        private String code;
        /**
         * 异常消息描述
         */
        private String msg;
    }
    
    1. 利用责任链模式组装两个限流策略

    抽象父类

    package com.intretech.smart.mqtt.msgserver.responsibility;
    
    import lombok.Getter;
    import lombok.Setter;
    
    /**
     * 限流抽象父类
     *
     * <p>利用责任链模式组装需要执行限流的策略.
     *
     * @author Andrew
     * @date 2021/6/21
     */
    @Getter
    @Setter
    public abstract class AbstractRateLimiterHandler {
        /**
         * 限流抽象父类
         */
        private AbstractRateLimiterHandler nextAbstractRateLimiterHandler;
    
        /**
         * 处理请求限流方法
         *
         * @param sessionId 会话key
         * @param msgType   消息类型
         */
        public abstract void judgeMsgType(String sessionId, String msgType);
    }
    

    具体实现类

    package com.intretech.smart.mqtt.msgserver.responsibility;
    
    import com.google.common.collect.ImmutableList;
    import com.intretech.smart.mqtt.msgserver.config.RateLimiterException;
    import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;
    import com.intretech.smart.mqtt.msgserver.domian.RateLimiter;
    import com.intretech.smart.mqtt.msgserver.service.RateLimiterService;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.core.script.DefaultRedisScript;
    
    /**
     * 升级请求接口自定义限流策略类
     *
     * <p>继承自AbstractRateLimiterHandler,实现责任链模式.
     * 对升级功能的接口全部列出,匹配则执行限流方法,无匹配则转发至下一个责任链.
     *
     * @author Andrew
     * @date 2021/6/21
     */
    @Configuration
    public class UpGradeLimiterHandler extends AbstractRateLimiterHandler {
        private static final Logger logger = LoggerFactory.getLogger(UpGradeLimiterHandler.class);
    
        @Autowired
        private StringRedisTemplate limitRedisTemplate;
    
        @Autowired
        private DefaultRedisScript<Number> redisScript;
    
        @Autowired
        private RateLimiter rateLimiter;
    
        @Autowired
        private RateLimiterService rateLimiterService;
    
        /**
         * 生成限流唯一key
         *
         * @return 限流唯一key
         */
        private String makeRateLimiterKey(String clientId, String url) {
            return RateLimiterCommon.UPGRADE_KEY_PREFIX + url + clientId;
        }
    
        @Override
        public void judgeMsgType(String sessionId, String msgType) {
            if (rateLimiter.getUpGradeLimiter().getUrl().contains(msgType)) {
                performRateLimiter(sessionId, msgType);
            } else {
                if (getNextAbstractRateLimiterHandler() != null) {
                    getNextAbstractRateLimiterHandler().judgeMsgType(sessionId, msgType);
                } else {
                    logger.warn("OrdinaryLimiterHandler: No matching executable current limiting policy sessionId:{}, msgType:{}", sessionId, msgType);
                }
            }
        }
    
        /**
         * 限流执行具体方法
         *
         * @param sessionId 会话token
         * @param msgType   消息类型
         */
        private void performRateLimiter(String sessionId, String msgType) {
            // 获得唯一客户端标识
            String clientId = rateLimiterService.judgeClient(sessionId);
            // 取出消息体中唯一客户端标识
            if (StringUtils.isEmpty(clientId)) {
                logger.info("OrdinaryLimiterHandler: rateLimiterService.judgeClient(String accessToken) return null, sessionId:{}", sessionId);
                return;
            }
    
            // 生成唯一限流key
            String limitKey = makeRateLimiterKey(clientId, msgType);
            // 不可变、线程安全的列表集合,只会获取传入对象的一个副本
            ImmutableList<String> keys = ImmutableList.of(limitKey);
            Number count = limitRedisTemplate.execute(redisScript, keys, rateLimiter.getUpGradeLimiter().getRefreshInterval(), rateLimiter.getUpGradeLimiter().getLimit());
            // 判断redis是否返回-1,-1代表无令牌,其他正数则是剩余令牌数
            if (count == null || count.intValue() == -1) {
                logger.info("OrdinaryLimiterHandler:clientId:{} access more than limit:{} times in freshInterval:{}", clientId, rateLimiter.getUpGradeLimiter().getLimit(), rateLimiter.getUpGradeLimiter().getRefreshInterval());
                throw new RateLimiterException(RateLimiterEnum.TOO_MANY_REQUESTS);
            }
        }
    }
    

    总结:

    1、分布式限流方案优选应当以redis作为缓存中间件配合lua脚本实现令牌桶算法,redis存储两个关键值:1.客户端第一次访问时间 2.当前桶中剩余令牌数、

    2、基于restFull风格的api在每次变更后会认定为是一个全心的url不受限流作用,如果以带参形式访问即使换了参数也被认为是同一个url受限流作用。

       // 如果换掉ID即认为是新的url,比如localhost:8080/1 与localhost:8080/2虽访问同一个接口但是被认定为不同访问者
        @GetMapping("/{id}")
        public String selectOrderById(@PathVariable("id") Integer id) {
            return orderService.selectOrderById(id);
        }
      // 参数不同还是被认为是同一访问者,比如localhost:8080/single?id=1 与localhost:8080/single?id=2都被限流
        @GetMapping("/single")
        public String updateOrderById(Integer id) {
            return orderService.updateOrderById(id);
        }
    
  • 相关阅读:
    Win8 消费者预览版中文版下载地址 官方原版
    Easyui datagrid加载本地Json数据
    myeclipse 8.510.0 安装 svn 方法
    easyui tree使用方法
    安装Oracle 11g r2先决条件检查失败解决方法
    Win8/Win7或XP 双系统安装图文教程
    Oracle存储过程与函数
    MyEclipse 中 使用 TortoiseSVN(1)
    MyEclipse 中 使用 TortoiseSVN(2)
    easyui使用Ajax提交表单,返回Json数据
  • 原文地址:https://www.cnblogs.com/zbm-gg/p/14933441.html
Copyright © 2011-2022 走看看