zoukankan      html  css  js  c++  java
  • spring-cloud-gateway过滤器实践

    概述

    这里是 SpringCloud Gateway 实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway 是以 WebFlux 为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。

    本篇将基于 spring-cloud-gateway 简介 基础环境进行改造。

    工作原理

    Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有prepost两种方式的 filter,分别处理前置逻辑后置逻辑。客户端的请求先经过pre类型的 filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过post类型的 filter 处理,最后返回响应到客户端。

    过滤器执行流程如下,order 越大,优先级越低

    接下来我们来验证下 filter 执行顺序。

    这里创建 3 个过滤器,分别配置不同的优先级

    @Slf4j
    public class AFilter implements GlobalFilter {
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            log.info("AFilter前置逻辑");
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                log.info("AFilter后置逻辑");
            }));
        }
    }
    
    @Slf4j
    public class BFilter implements GlobalFilter {
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            log.info("BFilter前置逻辑");
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                log.info("BFilter后置逻辑");
            }));
        }
    }
    
    @Slf4j
    public class CFilter implements GlobalFilter {
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            log.info("CFilter前置逻辑");
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                log.info("CFilter后置逻辑");
            }));
        }
    }
    
    @Configuration
    public class FilterConfig {
    
        @Bean
        @Order(-1)
        public GlobalFilter a() {
            return new AFilter();
        }
    
        @Bean
        @Order(0)
        public GlobalFilter b() {
            return new BFilter();
        }
    
        @Bean
        @Order(1)
        public GlobalFilter c() {
            return new CFilter();
        }
    }
    
    curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
    
    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
    

    查看网关输出日志

    2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置逻辑
    2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter前置逻辑
    2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter前置逻辑
    
    2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter后置逻辑
    2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter后置逻辑
    2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter后置逻辑
    

    自定义过滤器

    现在假设我们要统计某个服务的响应时间,我们可以在代码中

    long beginTime = System.currentTimeMillis();
    // do something...
    long elapsed = System.currentTimeMillis() - beginTime;
    log.info("elapsed: {}ms", elapsed);
    

    每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。

    自定义过滤器需要实现 GatewayFilterOrdered 。其中 GatewayFilter 中的这个方法就是用来实现你的自定义的逻辑的

    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
    

    Ordered 中的 int getOrder() 方法是来给过滤器设定优先级别的,值越大则优先级越低。

    好了,让我们来撸代码吧.

    /**
     * 此过滤器功能为计算请求完成时间
     */
    public class ElapsedFilter implements GatewayFilter, Ordered {
    
        private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
            return chain.filter(exchange).then(
                    Mono.fromRunnable(() -> {
                        Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                        if (startTime != null) {
                            System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
                        }
                    })
            );
        }
    
        /*
         *过滤器存在优先级,order越大,优先级越低
         */
        @Override
        public int getOrder() {
            return Ordered.LOWEST_PRECEDENCE;
        }
    }
    
    

    我们在请求刚刚到达时,往 ServerWebExchange 中放入了一个属性 elapsedTimeBegin,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered 设为 Integer.MAX_VALUE 以降低优先级。

    现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange) 之前的就是 “pre” 部分,之后的也就是 then 里边的是 “post” 部分。

    创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边

    @Configuration
    public class FilterConfig {
    
    
        /**
         * http://localhost:8100/filter/provider
         * @param builder
         * @return
         */
        @Bean
        public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
            // @formatter:off
            // 可以对比application.yml中关于路由转发的配置
            return builder.routes()
                    .route(r -> r.path("/filter/**")
                            .filters(f -> f.stripPrefix(1)
                                    .filter(new ElapsedFilter()))
                            .uri("lb://idc-cloud-provider")
                            .order(0)
                            .id("filter")
                    )
                    .build();
            // @formatter:on
        }
    
    }
    

    基于全局过滤器实现审计功能

    // AdaptCachedBodyGlobalFilter
    
    @Component
    public class LogFilter implements GlobalFilter, Ordered {
    
        private Logger log = LoggerFactory.getLogger(LogFilter.class);
    
        private final ObjectMapper objectMapper = new ObjectMapper();
        private static final String START_TIME = "startTime";
        private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    
            ServerHttpRequest request = exchange.getRequest();
            // 请求路径
            String path = request.getPath().pathWithinApplication().value();
            // 请求schema: http/https
            String scheme = request.getURI().getScheme();
            // 请求方法
            HttpMethod method = request.getMethod();
            // 路由服务地址
            URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
            // 请求头
            HttpHeaders headers = request.getHeaders();
            // 设置startTime
            exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
            // 获取请求地址
            InetSocketAddress remoteAddress = request.getRemoteAddress();
    
    
            MultiValueMap<String, String> formData = null;
    
    
    
            AccessRecord accessRecord = new AccessRecord();
            accessRecord.setPath(path);
            accessRecord.setSchema(scheme);
            accessRecord.setMethod(method.name());
            accessRecord.setTargetUri(targetUri.toString());
            accessRecord.setRemoteAddress(remoteAddress.toString());
            accessRecord.setHeaders(headers);
    
            if (method == HttpMethod.GET) {
                formData = request.getQueryParams();
                accessRecord.setFormData(formData);
                writeAccessRecord(accessRecord);
            }
    
            if (method == HttpMethod.POST) {
                Mono<Void> voidMono = null;
                if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
                    // JSON
                    voidMono = readBody(exchange, chain, accessRecord);
                }
    
                if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {
                    // x-www-form-urlencoded
                    voidMono = readFormData(exchange, chain, accessRecord);
                }
    
                if (voidMono != null) {
                    return voidMono;
                }
    
            }
    
            return chain.filter(exchange);
        }
    
        private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
            return null;
        }
    
        private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
    
            return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
    
                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                dataBuffer.read(bytes);
                DataBufferUtils.release(dataBuffer);
                Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
                    DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                    DataBufferUtils.retain(buffer);
                    return Mono.just(buffer);
                });
    
    
                // 重写请求体,因为请求体数据只能被消费一次
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return cachedFlux;
                    }
                };
    
                ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
    
                return ServerRequest.create(mutatedExchange, messageReaders)
                        .bodyToMono(String.class)
                        .doOnNext(objectValue -> {
                            accessRecord.setBody(objectValue);
                            writeAccessRecord(accessRecord);
                        }).then(chain.filter(mutatedExchange));
            });
        }
    
        @Override
        public int getOrder() {
            return Ordered.LOWEST_PRECEDENCE;
        }
    
        /**
         * TODO 异步日志
         * @param accessRecord
         */
        private void writeAccessRecord(AccessRecord accessRecord) {
    
            log.info("
    
     start------------------------------------------------- 
     " +
                            "请求路径:{}
     " +
                            "scheme:{}
     " +
                            "请求方法:{}
     " +
                            "目标服务:{}
     " +
                            "请求头:{}
     " +
                            "远程IP地址:{}
     " +
                            "表单参数:{}
     " +
                            "请求体:{}
     " +
                            "end------------------------------------------------- 
     ",
                    accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());
        }
    }
    
    curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
    
    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
    

    输出结果

     start-------------------------------------------------
     请求路径:/provider1
     scheme:http
     请求方法:POST
     目标服务:http://192.168.124.5:2001/provider1
     请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]
     远程IP地址:/192.168.124.5:49969
     表单参数:null
     请求体:{"name":"admin"}
     end-------------------------------------------------
    

    接下来,我们来配置日志,方便日志系统提取日志。SpringBoot 默认的日志为 logback。

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
        <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />
    
        <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <Pattern>
                    %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable
                </Pattern>
            </layout>
        </appender>
    
        <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOGS}/spring-boot-logger.log</file>
            <encoder
                    class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <Pattern>%d %p %C{1.} [%t] %m%n</Pattern>
            </encoder>
    
            <rollingPolicy
                    class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!-- rollover daily and when the file reaches 10 MegaBytes -->
                <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log
                </fileNamePattern>
                <timeBasedFileNamingAndTriggeringPolicy
                        class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                    <maxFileSize>10MB</maxFileSize>
                </timeBasedFileNamingAndTriggeringPolicy>
            </rollingPolicy>
        </appender>
    
        <!-- LOG everything at INFO level -->
        <root level="info">
            <!--<appender-ref ref="RollingFile" />-->
            <appender-ref ref="Console" />
        </root>
    
        <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上级loger传递打印信息。默认是true-->
        <logger name="cn.idea360.gateway" level="info" additivity="false">
            <appender-ref ref="RollingFile" />
            <appender-ref ref="Console" />
        </logger>
    
    </configuration>
    

    这样 console 和日志目录下就都有日志了。

    自定义过滤器工厂

    如果你看过静态路由的配置,你应该对如下配置有印象。

    filters:
      - StripPrefix=1
      - AddResponseHeader=X-Response-Default-Foo, Default-Bar
    

    StripPrefixAddResponseHeader 这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。

    我们就将之前的那个 ElapsedFilter 改造一下,让它能接收一个 boolean 类型的参数,来决定是否将请求参数也打印出来。

    public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {
    
        private static final Log log = LogFactory.getLog(GatewayFilter.class);
        private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
        private static final String KEY = "withParams";
    
    
        public List<String> shortcutFieldOrder() {
            return Arrays.asList(KEY);
        }
    
        public ElapsedGatewayFilterFactory() {
            super(Config.class);
        }
    
    
        public GatewayFilter apply(Config config) {
            return (exchange, chain) -> {
                exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
                return chain.filter(exchange).then(
                        Mono.fromRunnable(() -> {
                            Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                            if (startTime != null) {
                                StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
                                        .append(": ")
                                        .append(System.currentTimeMillis() - startTime)
                                        .append("ms");
                                if (config.isWithParams()) {
                                    sb.append(" params:").append(exchange.getRequest().getQueryParams());
                                }
                                log.info(sb.toString());
                            }
                        })
                );
            };
        }
    
    
        public static class Config {
    
            private boolean withParams;
    
            public boolean isWithParams() {
                return withParams;
            }
    
            public void setWithParams(boolean withParams) {
                this.withParams = withParams;
            }
    
        }
    }
    

    过滤器工厂的顶级接口是 GatewayFilterFactory,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix 和我们创建的这种),后者接收两个参数(像 AddResponseHeader)。

    GatewayFilter apply(Config config) 方法内部实际上是创建了一个 GatewayFilter 的匿名类,具体实现和之前的几乎一样,就不解释了。

    静态内部类 Config 就是为了接收那个 boolean 类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder() 这个方法。

    这里注意一下,一定要调用一下父类的构造器把 Config 类型传过去,否则会报 ClassCastException

    public ElapsedGatewayFilterFactory() {
        super(Config.class);
    }
    

    工厂类我们有了,再把它注册到 Spring 当中

    @Bean
    public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {
        return new ElapsedGatewayFilterFactory();
    }
    

    然后添加配置(主要改动在 default-filters 配置)

    server:
      port: 2000
    spring:
      application:
        name: idc-gateway
      redis:
        host: localhost
        port: 6379
        timeout: 6000ms  # 连接超时时长(毫秒)
        jedis:
          pool:
            max-active: 1000  # 连接池最大连接数(使用负值表示没有限制)
            max-wait: -1ms      # 连接池最大阻塞等待时间(使用负值表示没有限制)
            max-idle: 10      # 连接池中的最大空闲连接
            min-idle: 5       # 连接池中的最小空闲连接
      cloud:
        consul:
          host: localhost
          port: 8500
        gateway:
          discovery:
            locator:
              enabled: true
              # 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写
          default-filters:
            - Elapsed=true
          routes:
            - id: provider  # 路由 ID,保持唯一
              uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务
              predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)
                - Path=/p/**
              filters:
                - StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view
    

    结语

    本文到此结束。关于 Webflux 的学习刚入门,觉得可以像 Rxjava 那样在 onNext 中拿到异步数据,然而在 post 获取 body 中没生效。经测试可知 getBody 获得的数据输出为 null,而自己通过 Flux.create 创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。

    参考

  • 相关阅读:
    【原】PHP从入门到精通2小时【图文并茂】
    VMware虚拟机扩容
    linux修改固定IP
    According to TLD or attribute directive in tag file, attribute items does not accept any expressions
    eclipse juno 怎么安装maven
    MySQL的ROUND函数
    GitHub如何删除一个代码仓库
    win7企业版激活
    git-中文乱码
    Eclipse各个版本及其对应代号、下载地址列表【转】
  • 原文地址:https://www.cnblogs.com/idea360/p/12616475.html
Copyright © 2011-2022 走看看