zoukankan      html  css  js  c++  java
  • Spring Cloud Gateway 6 限流

    Spring Cloud Gateway 限流

    限流的目的是通过对并发访问/请求进行限速或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可由拒绝服务,就是定向到错误页或友好的展示页,排队或等待

    Gateway内置过滤器工厂限流

    Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中

    pom文件中引入gateway的起步依赖和redis的reactive依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifatId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
    

    配置

    server:
      port: 8081
    spring:
      cloud:
        gateway:
          routes:
          - id: limit_route
            uri: http://httpbin.org:80/get
            predicates:
            - After=2017-01-20T17:42:47.789-07:00[America/Denver]
            filters:
            - name: RequestRateLimiter
              args:
                key-resolver: '#{@hostAddrKeyResolver}'
                redis-rate-limiter.replenishRate: 1
                redis-rate-limiter.burstCapacity: 3
      application:
        name: gateway-limiter
      redis:
        host: localhost
        port: 6379
        database: 0
    

    配置文件,指定程序的端口为8081,配置了 redis的信息,并配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个参数:

    • burstCapacity,令牌桶总容量。
    • replenishRate,令牌桶每秒填充平均速率。
    • key-resolver,用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。

    KeyResolver需要实现resolve方法,比如根据Hostname进行限流,则需要用hostAddress去判断。实现完KeyResolver之后,需要将这个类的Bean注册到Ioc容器中

    public class HostAddrKeyResolver implements KeyResolver {
    
        @Override
        public Mono<String> resolve(ServerWebExchange exchange) {
            return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
        }
    
    }
         @Bean
        public HostAddrKeyResolver hostAddrKeyResolver() {
            return new HostAddrKeyResolver();
        }
    

    根据uri去限流,这时KeyResolver代码如下:

    public class UriKeyResolver  implements KeyResolver {
    
        @Override
        public Mono<String> resolve(ServerWebExchange exchange) {
            return Mono.just(exchange.getRequest().getURI().getPath());
        }
    
    }
         @Bean
        public UriKeyResolver uriKeyResolver() {
            return new UriKeyResolver();
        }
    

    以用户的维度去限流

    @Bean
        KeyResolver userKeyResolver() {
            return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
        }
    

    自定义限流

    Spring Cloud Gateway实现自定义限流,需要编写一个过滤器。Guava中的RateLimiter,Bucket4j,RateLimitJ限流都是基于令牌桶实现的。

    下面使用Bucket4j实现限流。

    pom

      <!-- Spring Cloud Gateway的依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-gateway</artifactId>
            </dependency>
            <!-- Bucket4j限流依赖-->
            <dependency>
                <groupId>com.github.vladimir-bukhtoyarov</groupId>
                <artifactId>bucket4j-core</artifactId>
                <version>4.0.0</version>
            </dependency>
    

    自定义过滤器需要实现GatewayFilter,Ordered接口,实现对ip的限流

    import io.github.bucket4j.Bandwidth;
    import io.github.bucket4j.Bucket;
    import io.github.bucket4j.Bucket4j;
    import io.github.bucket4j.Refill;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.core.Ordered;
    import org.springframework.http.HttpStatus;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 自定义过滤器进行ip限流
     */
    public class GatewayRateLimitFilterByIp implements GatewayFilter, Ordered {
    
        private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByIp.class);
    
        /**
         * 单机网关限流用一个ConcurrentHashMap来存储 bucket,
         * 如果是分布式集群限流的话,可以采用 Redis等分布式解决方案
         */
        private static final Map<String, Bucket> LOCAL_CACHE = new ConcurrentHashMap<>();
    
        /**
         * 桶的最大容量,即能装载 Token 的最大数量
         */
        int capacity;
        /**
         * 每次 Token 补充量
         */
        int refillTokens;
        /**
         *补充 Token 的时间间隔
         */
        Duration refillDuration;
    
        public GatewayRateLimitFilterByIp() {
        }
    
        public GatewayRateLimitFilterByIp(int capacity, int refillTokens, Duration refillDuration) {
            this.capacity = capacity;
            this.refillTokens = refillTokens;
            this.refillDuration = refillDuration;
        }
    
        private Bucket createNewBucket() {
            Refill refill = Refill.of(refillTokens, refillDuration);
            Bandwidth limit = Bandwidth.classic(capacity, refill);
            return Bucket4j.builder().addLimit(limit).build();
        }
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
            Bucket bucket = LOCAL_CACHE.computeIfAbsent(ip, k -> createNewBucket());
            log.debug("IP:{} ,令牌通可用的Token数量:{} " ,ip,bucket.getAvailableTokens());
            if (bucket.tryConsume(1)) {
                return chain.filter(exchange);
            } else {
               //当可用的令牌书为0是,进行限流返回429状态码
                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                return exchange.getResponse().setComplete();
            }
        }
    
        @Override
        public int getOrder() {
            return -1000;
        }
    
        public static Map<String, Bucket> getLocalCache() {
            return LOCAL_CACHE;
        }
    
        public int getCapacity() {
            return capacity;
        }
    
        public void setCapacity(int capacity) {
            this.capacity = capacity;
        }
    
        public int getRefillTokens() {
            return refillTokens;
        }
    
        public void setRefillTokens(int refillTokens) {
            this.refillTokens = refillTokens;
        }
    
        public Duration getRefillDuration() {
            return refillDuration;
        }
    
        public void setRefillDuration(Duration refillDuration) {
            this.refillDuration = refillDuration;
        }
    }
    

    代码配置

     @Bean
        public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
            return builder.routes()
                    .route(r -> r.path("/test/rateLimit")
                            .filters(f -> f.filter(new GatewayRateLimitFilterByIp(10,1,Duration.ofSeconds(1))))
                            .uri("http://localhost:8000/hello/rateLimit")
                            .id("rateLimit_route")
                    ).build();
        }
    

    基于CPU使用率进行限流

    通过Spring Boot Actuator 提供的Metrics获取当前CPU的使用情况,进行限流

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.actuate.metrics.MetricsEndpoint;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.core.Ordered;
    import org.springframework.http.HttpStatus;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.util.Objects;
    
    /**
     * 根据CPU的使用情况限流
     **/
    @Component
    public class GatewayRateLimitFilterByCpu implements GatewayFilter, Ordered {
    
        private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByCpu.class);
    
        @Autowired
        private MetricsEndpoint metricsEndpoint;
    
        private static final String METRIC_NAME = "system.cpu.usage";
    
        private static final double MAX_USAGE = 0.50D;
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            //获取网关所在机器的CPU使用情况
            Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME, null)
                    .getMeasurements()
                    .stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(MetricsEndpoint.Sample::getValue)
                    .filter(Double::isFinite)
                    .orElse(0.0D);
    
            boolean isOpenRateLimit = systemCpuUsage >MAX_USAGE;
            log.debug("system.cpu.usage: {}, isOpenRateLimit:{} ",systemCpuUsage , isOpenRateLimit);
            if (isOpenRateLimit) {
                //当CPU的使用超过设置的最大阀值开启限流
                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                return exchange.getResponse().setComplete();
            } else {
                return chain.filter(exchange);
            }
        }
    
        @Override
        public int getOrder() {
            return 0;
        }
    
    }
    

    代码配置

        @Autowired
        private GatewayRateLimitFilterByCpu gatewayRateLimitFilterByCpu;
    
        @Bean
        public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
            return builder.routes()
                    .route(r -> r.path("/test/rateLimit")
                            .filters(f -> f.filter(gatewayRateLimitFilterByCpu))
                            .uri("http://localhost:8000/hello/rateLimit")
                            .id("rateLimit_route")
                    ).build();
        }
    
  • 相关阅读:
    磁盘原理总结
    Algorithm:多维数组和矩阵
    Algorithm:字典序最小问题
    Algorithm:递归思想及实例分析
    Algorithm:贪心策略之区间覆盖问题
    Algorithm:贪心策略之区间选点问题
    Algorithm:位运算的这些小技巧你知道吗?
    面试题
    操作系统
    数据结构:B树和B+树的插入、删除图文详解
  • 原文地址:https://www.cnblogs.com/chenglc/p/13152440.html
Copyright © 2011-2022 走看看