zoukankan      html  css  js  c++  java
  • Spring-Cloud-Gateway 从升级到放弃

     

    1 为什么要升级为spring-cloud-gateway?

    Spring Cloud Gateway features:

    • Built on Spring Framework 5, Project Reactor and Spring Boot 2.0

    • Able to match routes on any request attribute.

    • Predicates and filters are specific to routes.

    • Hystrix Circuit Breaker integration.

    • Spring Cloud DiscoveryClient integration

    • Easy to write Predicates and Filters

    • Request Rate Limiting

    • Path Rewriting

    这是官方说的,spring gateway相对spring zuul要新很多,应用也更加自由,开发体验更好。但是我在测试中发现,spring-cloud-gateway相对于zuul来说,更加出众的还是其性能,当然最后让我放弃的也是因为这一点。

    网上的朋友也有做一些gateway和zuul的性能比较,大多的结论也是gateway要优于zuul,同时也更加稳定。

    但是我们不能轻信,所以我也做了测试,这部分测试内容若不感兴趣可以跳过,zuul就不测试了。

    2.spring-cloud-gateway的初步测试

      step.1:测试准备:

        1.gateway版本:2.0.1

        2.服务主机:10.1.4.32,16G内存,4核虚拟机

        3.测试客户端:10.1.4.34,16G内存,4核虚拟机

        4.测试工具wrk

      step.2:建立gateway工程并写两个测试http接口,

        1.http://10.1.4.32:14077/hello [POST]

        2.http://10.1.4.32:14077/test [GET]

      step.3:开始测试

      step.4:测试结果   

    [wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency  http://10.1.4.32:14077/test
    Running 10s test @ http://10.1.4.32:14077/test
      15 threads and 500 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency     3.38ms    3.26ms 113.45ms   95.76%
        Req/Sec    10.84k     1.44k   26.48k    89.50%
      Latency Distribution
         50%    2.79ms
         75%    3.51ms
         90%    4.21ms
         99%   17.23ms
      1625714 requests in 10.10s, 131.79MB read
    Requests/sec: 160961.07
    Transfer/sec:     13.05MB

    以及:

    [wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency -s scripts/gateway.lua  http://10.1.4.32:14077/hello
    Running 10s test @ http://10.1.4.32:14077/hello
      15 threads and 500 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency     5.21ms    3.96ms 255.59ms   96.75%
        Req/Sec     6.62k   604.79    13.72k    88.48%
      Latency Distribution
         50%    4.78ms
         75%    5.55ms
         90%    6.32ms
         99%   14.87ms
      994374 requests in 10.10s, 539.59MB read
    Requests/sec:  98471.14
    Transfer/sec:     53.43MB

    说明,如果测试结果差别较大可能是因为测试工具的问题。

    结果显示,POST方法的性能TPS达到了10W/s,而GET方法的性能TPS达到了16W/s。

    这看起来很不可思议,因为正常的微服务,能达到2W/s的性能已经是良好,达到10W实在是不可思议。但是前面说了spring-cloud-gateway引入了Spring Reactor反应式编程,应对的便是这种高并发需求。

    当然,即便spring-cloud-gateway给了我们很大惊喜,但是如果因此就引入了spring-cloud-gateway,那还是会有些草率,毕竟gateway是用来干什么的?是路由和过滤。继续测试。

      step.5:加上路由和过滤器,在配置文件中加入下面内容

    spring:
      cloud:
        gateway:
          routes:
          - id: test
            uri: http://10.1.4.32:14077/test
            predicates:
            - Path=/tt
            filters:
            - AddRequestParameter=foo, bar

    表示,给test方法加入了路由,并且加入了官方提供的过滤器:AddRequestParameter=foo, bar

      step.6:测试,并附测试结果:

    [wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency  http://10.1.4.32:14077/tt
    Running 10s test @ http://10.1.4.32:14077/tt
      15 threads and 500 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency    18.99ms   12.15ms 122.69ms   70.84%
        Req/Sec     1.82k   155.77     2.36k    73.94%
      Latency Distribution
         50%   17.03ms
         75%   25.49ms
         90%   35.02ms
         99%   57.13ms
      274529 requests in 10.10s, 22.25MB read
    Requests/sec:  27182.88
    Transfer/sec:      2.20MB

    性能只剩27000/s,貌似降低了很多,但是比起zuul仍然快了不少。因为在这台机器上,测试zuul或许都不能到达2W。

    那么,是不是就应该使用spring-cloud-gateway了?

    3.开始使用spring-cloud-gateway

    在使用上spring-cluod-gateway之后,我开始编辑自己的过滤器,需求要求写两个过滤器,修改请求体和响应体。

    因为需要对特定的请求使用过滤器,所以这里使用gateway-filter,有些代码官方有,有些网友提供,两个过滤器代码大致如下:

    解密过滤器,pre:

    package com.newland.dc.ctid.fileter;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.newland.dc.common.vo.RequestHeaderVo;
    import com.newland.dc.ctid.entity.dto.RequestDto;
    import io.netty.buffer.ByteBufAllocator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
    import org.springframework.core.Ordered;
    import org.springframework.core.io.buffer.DataBuffer;
    import org.springframework.core.io.buffer.DataBufferUtils;
    import org.springframework.core.io.buffer.NettyDataBufferFactory;
    import org.springframework.core.style.ToStringCreator;
    import org.springframework.http.MediaType;
    import org.springframework.http.server.reactive.ServerHttpRequest;
    import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.nio.CharBuffer;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicReference;
    
    /**
     * Created by garfield on 2019/2/26.
     */
    @Component
    public class DecryptGatewayFilterFactory extends AbstractGatewayFilterFactory<DecryptGatewayFilterFactory.Config>{
    
        private static Logger log = LoggerFactory.getLogger(DecryptGatewayFilterFactory.class);
    
        public static final String DECRYPT_HEADER = "decrypt_header";
    
    
        public DecryptGatewayFilterFactory() {
            super(Config.class);
        }
    
    
        private Gson gson = new GsonBuilder().serializeNulls().create();
    
    
        @Override
        @SuppressWarnings("unchecked")
        public GatewayFilter apply(Config config) {
            return new DecryptGatewayFilter(config);
        }
    
        public class DecryptGatewayFilter implements GatewayFilter, Ordered {
            Config config;
    
            DecryptGatewayFilter(Config config) {
                this.config = config;
            }
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                    log.debug(config.toString());
                    ServerHttpRequest request = exchange.getRequest();
                    MediaType contentType = request.getHeaders().getContentType();
    
                    boolean postRequest = "POST".equalsIgnoreCase(request.getMethodValue()) && !contentType.toString().contains("multipart/form-data");
                    //判断是否为POST请求
                    if (postRequest) {
    
                        Flux<DataBuffer> body = request.getBody();
                        AtomicReference<String> bodyRef = new AtomicReference<>();//缓存读取的request body信息
                        body.subscribe(dataBuffer -> {
                            CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
                            DataBufferUtils.release(dataBuffer);
                            bodyRef.set(charBuffer.toString());
                        });//读取request body到缓存
                        String bodyStr = bodyRef.get();//获取request body
                        log.debug(bodyStr);//这里是我们需要做的操作
                        RequestDto requestDto = gson.fromJson(bodyStr, RequestDto.class);
                        log.debug("decrypt filter");
                        //save header to response header
                        RequestHeaderVo headerVo = requestDto.getHeader();
                        headerVo.setAppVersion("1000");
                        //此处可以传递一些变量
                        exchange.getResponse().getHeaders().add(DECRYPT_HEADER, gson.toJson(headerVo));
    
                        DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
                        Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
    
                        request = new ServerHttpRequestDecorator(request){
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return bodyFlux;
                            }
                        };//封装我们的request
                    }
                    return chain.filter(exchange.mutate().request(request.mutate().header("a","200").build()).build());
                };
    
            @Override
            public int getOrder() {
                return -10;
            }
        }
    
        public static DataBuffer stringBuffer(String value) {
            byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
    
            NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
            DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
            buffer.write(bytes);
            return buffer;
        }
    
    
        @Override
        public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
            return null;
        }
    
        public static class Config {
            private boolean decrypt;
    
    
            public boolean isDecrypt() {
                return decrypt;
            }
    
            public void setDecrypt(boolean decrypt) {
                this.decrypt = decrypt;
            }
    
            @Override
            public String toString() {
                return new ToStringCreator(this)
                        .append("decrypt", decrypt)
                        .toString();
            }
        }
    
    
        @Override
        public List<String> shortcutFieldOrder() {
            return Arrays.asList("decrypt");
        }
    }
    View Code

    加密过滤器,使用源码的提供的修改方法,post:

    package com.newland.dc.ctid.fileter;
    
    import com.google.gson.Gson;
    import com.newland.dc.common.vo.RequestHeaderVo;
    import com.newland.dc.ctid.entity.dto.RequestDto;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
    import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
    import org.springframework.core.style.ToStringCreator;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @Auther: garfield
     * @Date: 2019/3/5 15:33
     * @Description:
     */
    @Component
    public class AnGatewayFilterFactory extends AbstractGatewayFilterFactory<AnGatewayFilterFactory.Config> {
    
    
        private Gson gson = new Gson();
    
        public AnGatewayFilterFactory() {
            super(Config.class);
        }
    
        @Override
        public GatewayFilter apply(Config config) {
    
            ModifyResponseBodyGatewayFilterFactory m1 = new ModifyResponseBodyGatewayFilterFactory(null);
            ModifyResponseBodyGatewayFilterFactory.Config c1 = new ModifyResponseBodyGatewayFilterFactory.Config();
            c1.setInClass(String.class);
            c1.setOutClass(String.class);
            c1.setNewContentType("application/json");
    
            c1.setRewriteFunction((exchange, body) -> {
                ServerWebExchange ex = (ServerWebExchange) exchange;
                //此处更改响应体
                RequestHeaderVo requestHeaderVo = new RequestHeaderVo();
                RequestDto requestDto = gson.fromJson(body.toString(), RequestDto.class);
                requestDto.setHeader(requestHeaderVo);
                body = gson.toJson(requestDto);
                return Mono.just(body);
            });
            return m1.apply(c1);
        }
    
        public static class Config {
            private boolean decrypt;
    
    
            public boolean isDecrypt() {
                return decrypt;
            }
    
            public void setDecrypt(boolean decrypt) {
                this.decrypt = decrypt;
            }
    
            @Override
            public String toString() {
                return new ToStringCreator(this)
                        .append("encrypt", decrypt)
                        .toString();
            }
        }
    
        @Override
        public List<String> shortcutFieldOrder() {
            return Arrays.asList("encrypt");
        }
    }
    View Code

    这里需要转移一下话题,这个过滤器修改其实有几种方法,可以自己写,也可以应用源码提供的例子。上面的两种写法已经测试都能使用,其实我还有两种方式,大同小异就是了,但也准备贴出来,也记录一下问题:

    下面这个其实就是源码中的例子,只不过不引用,自己写:

        @Override
            @SuppressWarnings("unchecked")
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    
                ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
                    @Override
                    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                        ServerHttpRequest request = exchange.getRequest();
    
                        MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
                        HttpHeaders httpHeaders = new HttpHeaders();
                        httpHeaders.setContentType(originalResponseContentType);
                        ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);
                        DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults());
    
                        Mono<DataBuffer> modifiedBody = clientResponse.bodyToMono(DataBuffer.class).map(encrypt(config, new RequestHeaderVo()));
    
                        BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, DataBuffer.class);
                        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());
                        return bodyInserter.insert(outputMessage, new BodyInserterContext())
                                .then(Mono.defer(() -> {
                                    Flux<DataBuffer> messageBody = outputMessage.getBody();
                                    HttpHeaders headers = getDelegate().getHeaders();
                                    if (headers.getContentLength() < 0 && !headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {
                                        messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
                                    }
                                    return this.getDelegate().writeWith(messageBody);
                                }));
                    }
    
                    @Override
                    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
                        return writeWith(Flux.from(body)
                                .flatMapSequential(p -> p));
                    }
                };
                return chain.filter(exchange.mutate().response(responseDecorator).build());
    
            }
    
            @Override
            public int getOrder() {
                return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
            }
    
        }
    
        private Function<DataBuffer, DataBuffer> encrypt(Config config, RequestHeaderVo headerVo) {
            if (config.encrypt) {
                return (i) -> {
                    InputStream inputStream = i.asInputStream();
    
                    byte[] bytes = new byte[0];
                    try {
                        bytes = new byte[inputStream.available()];
    
    
                        inputStream.read(bytes);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    //进行我们的操作
                    String body = new String(bytes);
                    log.debug("this is response encrypt");
                    log.debug(body);
                    log.debug(headerVo.toString());
    //                body = encryptService.responseEncrypt(body, headerVo);
    
                    //进行我们的操作
                    return i.write(TokenGatewayFilterFactory.stringBuffer(body));
    //                return i.write(new String(body).getBytes());
    
                };
    
    
            } else {
                return i -> i;
            }
    
        }
    View Code

    这种例子中,发现修改response body的时候,会引起代码进入NioEventLoop类中的run方法,死循环无法退出,我也不清楚为什么,修改需谨慎。

    另一种,跟这位网友写得差不多,只不过我没测试就是了:https://www.jianshu.com/p/9f00e0e1681c

            @Override
            @SuppressWarnings("unchecked")
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                return chain.filter(exchange.mutate().response(new ServerHttpResponseDecorator(exchange.getResponse()) {
                    @Override
                    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                        DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
                        if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
                            Flux<? extends DataBuffer> fluxBody = Flux.first(body);
                            return super.writeWith(fluxBody.map(dataBuffer -> {
                                System.out.println(dataBuffer.readableByteCount());
    
                                byte[] content = new byte[dataBuffer.readableByteCount()];
                                dataBuffer.read(content);
                                //释放掉内存
                                DataBufferUtils.release(dataBuffer);
                                //responseData就是下游系统返回的内容,可以查看修改
                                String responseData = new String(content, Charset.forName("UTF-8"));
    
                                log.debug("响应内容:{}", responseData);
                                log.debug("this is response encrypt");
                                System.out.println(responseData);
    
                                byte[] newContent = responseData.getBytes();
    //                body = encryptService.responseEncrypt(body, headerVo);
                                byte[] uppedContent = new String(newContent, Charset.forName("UTF-8")).getBytes();
                                return bufferFactory.wrap(uppedContent);
                            }));
                        } else {
                            log.error("响应code异常:{}", getStatusCode());
                        }
                        return super.writeWith(body);
                    }
                }).build());
            }
    
    
            @Override
            public int getOrder() {
                return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
            }
        }
    View Code

    这个方法会出现问题,body的截取长度经常没有完全。

    我本来是到这个网址下面寻找答案,作者是这样回复的:

      上面只是简单的样例,FIux是发送多个数据的,当报文长时会拆分,处理一次只能拿到一部分报文,可以使用Flux.toArray方法将数据聚合后处理,也可以参照https://www.jianshu.com/p/9b781fb1aaa0里面的响应处理。

    确实是这个问题,所以我们也可以仿照他的另外一个例子写,大家可以到他的简书博客中去看,值得提醒的是,他的例子中,版本也是2.0.1,若是版本改为2.1以上,就不能用哦!

    这里蛮贴一下:

    package com.newland.dc.ctid.fileter;
    
    import com.google.gson.Gson;
    import com.newland.dc.common.vo.RequestHeaderVo;
    import com.newland.dc.ctid.service.SecurityService;
    import com.newland.dc.log.kafka.KafkaLog;
    import org.reactivestreams.Publisher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
    import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
    import org.springframework.cloud.gateway.support.*;
    import org.springframework.core.Ordered;
    import org.springframework.core.io.buffer.DataBuffer;
    import org.springframework.core.style.ToStringCreator;
    import org.springframework.http.HttpHeaders;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.MediaType;
    import org.springframework.http.ResponseCookie;
    import org.springframework.http.client.reactive.ClientHttpResponse;
    import org.springframework.http.server.reactive.ServerHttpRequest;
    import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
    import org.springframework.http.server.reactive.ServerHttpResponse;
    import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
    import org.springframework.stereotype.Component;
    import org.springframework.util.MultiValueMap;
    import org.springframework.web.reactive.function.client.ExchangeStrategies;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.*;
    import java.util.function.BiFunction;
    import java.util.function.Function;
    
    import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
    
    
    /**
     * @Auther: garfield
     * @Date: 2019/2/28 上午10:45
     * @Description:
     */
    @Component
    public class EncryptGatewayFilterFactory extends AbstractGatewayFilterFactory<EncryptGatewayFilterFactory.Config> {
    
        private static Logger log = LoggerFactory.getLogger(EncryptGatewayFilterFactory.class);
    
    
        @Autowired
        private SecurityService encryptService;
    
        public EncryptGatewayFilterFactory() {
            super(Config.class);
        }
    
    
        //    private Gson gson = new GsonBuilder().serializeNulls().create();
        private Gson gson = new Gson();
    
        @Value("${server.host:10.10.10.10}")
        private String serverHost;
    
        @Value("${server.port}")
        private String serverPort;
    
    
        @Override
        @SuppressWarnings("unchecked")
        public GatewayFilter apply(Config config) {
            return new EncryptGatewayFilter(config);
        }
    
    
        @Override
        public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
            return null;
        }
    
        public static class Config {
    
            private boolean encrypt;
    
            public boolean isEncrypt() {
                return encrypt;
            }
    
            public Config setEncrypt(boolean encrypt) {
                this.encrypt = encrypt;
                return this;
            }
    
            @Override
            public String toString() {
                return new ToStringCreator(this)
                        .append("encrypt", encrypt)
                        .toString();
            }
        }
    
        @Override
        public List<String> shortcutFieldOrder() {
            return Arrays.asList("encrypt");
        }
    
        public class EncryptGatewayFilter implements GatewayFilter, Ordered {
            Config config;
    
            EncryptGatewayFilter(Config config) {
                this.config = config;
            }
    
            @Override
            @SuppressWarnings("unchecked")
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                String trace = exchange.getRequest().getHeaders().getFirst("trace");
                ServerRequest serverRequest = new DefaultServerRequest(exchange);
                return serverRequest.bodyToMono(String.class).flatMap(reqBody -> {
                    //重写原始请求
                    ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
                        @Override
                        public HttpHeaders getHeaders() {
                            HttpHeaders httpHeaders = new HttpHeaders();
                            httpHeaders.putAll(super.getHeaders());
                            return httpHeaders;
                        }
    
                        @Override
                        public Flux<DataBuffer> getBody() {
                            //打印原始请求日志
                            log.info("[Trace:{}]-gateway request:headers=[{}],body=[{}]", trace, getHeaders(), reqBody);
                            return Flux.just(reqBody).map(bx -> exchange.getResponse().bufferFactory().wrap(bx.getBytes()));
                        }
                    };
                    //重写原始响应
                    BodyHandlerServerHttpResponseDecorator responseDecorator = new BodyHandlerServerHttpResponseDecorator(
                            initBodyHandler(exchange), exchange.getResponse());
    
                    return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build());
                });
            }
    
            @Override
            public int getOrder() {
                return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
            }
    
        }
    
        public interface BodyHandlerFunction
                extends BiFunction<ServerHttpResponse, Publisher<? extends DataBuffer>, Mono<Void>> {
        }
    
        protected BodyHandlerFunction initBodyHandler(ServerWebExchange exchange) {
            return (resp, body) -> {
                //拦截
                MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.setContentType(originalResponseContentType);
                DefaultClientResponseAdapter clientResponseAdapter = new DefaultClientResponseAdapter(body, httpHeaders);
                Mono<String> bodyMono = clientResponseAdapter.bodyToMono(String.class);
                //此处可以获得前面放置的参数
                return bodyMono.flatMap((respBody) -> {
    //                打印返回响应日志
                    System.out.println(respBody);
                    return resp.writeWith(Flux.just(respBody).map(bx -> resp.bufferFactory().wrap(bx.getBytes())));
                }).then();
            };
        }
    
     
        public static class DefaultClientResponseAdapter extends DefaultClientResponse {
    
            /**
             * @param body
             * @param httpHeaders
             */
            public DefaultClientResponseAdapter(Publisher<? extends DataBuffer> body,
                                                HttpHeaders httpHeaders) {
                this(new ResponseAdapter(body, httpHeaders),
                        ExchangeStrategies.withDefaults());
            }
    
            /**
             * @param response
             * @param strategies
             */
            public DefaultClientResponseAdapter(ClientHttpResponse response,
                                                ExchangeStrategies strategies) {
                super(response, strategies);
            }
    
            /**
             * ClientHttpResponse 适配器
             */
            static class ResponseAdapter implements ClientHttpResponse {
                /**
                 * 响应数据
                 */
                private final Flux<DataBuffer> flux;
                /**
                 *
                 */
                private final HttpHeaders headers;
    
                public ResponseAdapter(Publisher<? extends DataBuffer> body,
                                       HttpHeaders headers) {
                    this.headers = headers;
                    if (body instanceof Flux) {
                        flux = (Flux) body;
                    } else {
                        flux = ((Mono) body).flux();
                    }
                }
    
                @Override
                public Flux<DataBuffer> getBody() {
                    return flux;
                }
    
                @Override
                public HttpHeaders getHeaders() {
                    return headers;
                }
    
                @Override
                public HttpStatus getStatusCode() {
                    return null;
                }
    
                @Override
                public int getRawStatusCode() {
                    return 0;
                }
    
                @Override
                public MultiValueMap<String, ResponseCookie> getCookies() {
                    return null;
                }
            }
        }
    
        class BodyHandlerServerHttpResponseDecorator extends ServerHttpResponseDecorator {
    
            /**
             * body 处理拦截器
             */
            private BodyHandlerFunction bodyHandler = initDefaultBodyHandler();
    
            /**
             * 构造函数
             *
             * @param bodyHandler
             * @param delegate
             */
            public BodyHandlerServerHttpResponseDecorator(BodyHandlerFunction bodyHandler, ServerHttpResponse delegate) {
                super(delegate);
                if (bodyHandler != null) {
                    this.bodyHandler = bodyHandler;
                }
            }
    
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                //body 拦截处理器处理响应
                return bodyHandler.apply(getDelegate(), body);
            }
    
            @Override
            public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
                return writeWith(Flux.from(body).flatMapSequential(p -> p));
            }
    
            /**
             * 默认body拦截处理器
             *
             * @return
             */
            private BodyHandlerFunction initDefaultBodyHandler() {
                return (resp, body) -> resp.writeWith(body);
            }
        }
    }
    View Code

    那么万事具备,代码都写好了,我们又需要进行性能测试。这边要记住,我用的是官方的那个例子,其他的写法也用过了,但是结果差不多。

    4.再一次测试spring-cloud-gateway 网关路由性能

      step.1:性能测试,改一下配置,表示加入了过滤器。这里为什么只有一个过滤器,因为这个过滤器问题比较大,过程就略过了。

          - id: an
            uri: http://10.1.4.32:14077/hello
            predicates:
            - Path=/an
            filters:
            - An

      经过多次测试,其他的过滤器都还好,只有修改response body的过滤器,严重影响性能,且有读写错误。

      step.2:测试,以及测试结果

    [wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency -s scripts/gateway.lua  http://10.1.4.32:14077/an
    Running 10s test @ http://10.1.4.32:14077/an
      15 threads and 500 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency     1.03s   488.75ms   2.00s    60.62%
        Req/Sec    26.59     13.84    80.00     67.60%
      Latency Distribution
         50%  931.54ms
         75%    1.45s
         90%    1.76s
         99%    1.97s
      3848 requests in 10.10s, 1.64MB read
      Socket errors: connect 0, read 0, write 0, timeout 458
    Requests/sec:    381.05
    Transfer/sec:    166.71KB

    结果多出一行,socket错误,而且还是超时,而且,日志中也存在错误:

    2019-03-06T16:09:33,396|INFO ||AsyncResolver-bootstrap-executor-0||||Resolving eureka endpoints via configuration
    2019-03-06T16:10:38,268|ERROR||reactor-http-server-epoll-18||||Unhandled failure: Connection has been closed, response already set (status=200)
    2019-03-06T16:10:38,268|WARN ||reactor-http-server-epoll-18||||Handling completed with error: Connection has been closed
    2019-03-06T16:10:38,269|ERROR||reactor-http-server-epoll-18||||Unhandled failure: null, response already set (status=200)
    2019-03-06T16:10:38,269|WARN ||reactor-http-server-epoll-18||||Handling completed with error: null
    2019-03-06T16:10:38,294|ERROR||reactor-http-server-epoll-18||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
    2019-03-06T16:10:38,294|WARN ||reactor-http-server-epoll-18||||Handling completed with error: syscall:write(..) failed: 断开的管道
    2019-03-06T16:10:38,306|ERROR||reactor-http-server-epoll-23||||Unhandled failure: syscall:write(..) failed: 断开的管道, response already set (status=null)
    2019-03-06T16:10:38,306|WARN ||reactor-http-server-epoll-23||||Handling completed with error: syscall:write(..) failed: 断开的管道
    2019-03-06T16:14:33,397|INFO ||AsyncResolver-bootstrap-executor-0||||Resolving eureka endpoints via configuration

    这个问题很严重了,因为单个请求的时候,并不会报错,这个错误只发生在高并发压测下,无法追踪。最重要的是,我们看到性能只剩下300/s,这是万万不能接受的,生产更不能接收。

    这个问题很难解释,因为我们采用的是官方提供的写法,我们回头看官方的修改response 类,好吧,不用看了,因为:

    package org.springframework.cloud.gateway.filter.factory.rewrite;
    /**
     * This filter is BETA and may be subject to change in a future release.
     */
    public class ModifyResponseBodyGatewayFilterFactory
            extends AbstractGatewayFilterFactory<ModifyResponseBodyGatewayFilterFactory.Config> {

    官方已经说了,这是测试版本,不顶用。

    不死心,又想起了gateway提供的GlobalFilter,将刚才的代码写到全局过滤器中再试试,但是结果相同!

    凉凉...

    跪求结论跟我不同的启发文档,或者只能等下一版本了。

  • 相关阅读:
    mysql命令行如何得到表结构
    liunx cron问题
    关于SQL判断某个值的查询
    给自己的nexus私人仓库添加缺少的jar包
    MyEclipse9 Maven开发Web工程 详细配置
    springMVC 之 Controller
    java 动态代理
    Freemarker
    java编程陷阱
    HttpClient应用
  • 原文地址:https://www.cnblogs.com/garfieldcgf/p/10484119.html
Copyright © 2011-2022 走看看