zoukankan      html  css  js  c++  java
  • spring-cloud-gateway(二)es代理功能需求

    实际是该项目的延申 cclient/elasticsearch-multi-cluster-compat-proxy: 网关代理兼容ES6 es7 proxy and compat elasticsearch version 7 and elasticsearch version 6's _search and _bulk request api (github.com)

    项目用spring-boot-starter-webflux 实现了es6和es7的兼容层网关,但只是做可行性验证

    真正生产使用的网关还需要更多功能,最基本的,限流,熔断,负载均衡,这些都独立往项目里加,每个点都需要花精力整合

    实际这方面应用已经有了很成熟的方案,服务治理相关,k8s,ingress,istio,kong,nginx...

    但对es这类bare服务,套用云服务的方案并不合适,kong/nginx因为是c+lua的技术栈,定制的成本较高

    nodejs已经过气了,go和java,考虑生态,选择java

    java方面的网关,很少单独提及,更多是做为java服务治理的一个组件来使用

    java类网关,早些年大家都使用netty/mina原生实现

    但后期有基于netty的各种封装好的http框架,再完全用netty开发http类网关就比较少见了,当然tcp/udp类,自定义rpc的还是免不了直接和netty打交道

    整合netty的 http高性能网关类服务早些年个人用过 jersey,后来用vert.x,再后来就直接上spring-boot-starter-webflux了,抽空把历史代码扒出来

    现在为了省去在webflux自行添加限流,熔断,降级等功能,直接使用spring-boot-gateway

    实际spring-boot-gateway,本身集成mvn/webflux的两种方案,webflux的底层就是netty

    本地代理其实可以理解为替代nginx,并实现一些和业务产品深度结合的功能
    因为nginx c+lua的技术栈和开发成本较高
    

    首先spring-cloud-gateway 原生是 spring-cloud的组件,应用场景和spring-cloud深耦合

    例如,loadbanlance,依赖spring-cloud的服务发现组件,consule,nacos等

    https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer
    
    Spring Cloud Commons provides the @EnableDiscoveryClient annotation. This looks for implementations of the DiscoveryClient and ReactiveDiscoveryClient interfaces with META-INF/spring.factories. Implementations of the discovery client add a configuration class to spring.factories under the org.springframework.cloud.client.discovery.EnableDiscoveryClient key. Examples of DiscoveryClient implementations include Spring Cloud Netflix Eureka, Spring Cloud Consul Discovery, and Spring Cloud Zookeeper Discovery.
    
    

    spring-cloud-gateway 基础支持

    Spring Cloud Gateway

    spring:
      cloud:
        gateway:
          routes:
          - id: before_route
            uri: https://example.org
            predicates:
            - Before=2017-01-20T17:42:47.789-07:00[America/Denver]
    

    以该项为例,关键是uri 这个参数

    网关单点1:1 uri 可以写一个http/https的访问地址

    如果需要实现负载均衡gateway: server 1:n 则需要实现 uri("lb://backing-service:8088")

    @Bean
    public RouteLocator routes(RouteLocatorBuilder builder) {
        return builder.routes()
            .route("circuitbreaker_route", r -> r.path("/consumingServiceEndpoint")
                .filters(f -> f.circuitBreaker(c -> c.name("myCircuitBreaker").fallbackUri("forward:/inCaseOfFailureUseThis").addStatusCode("INTERNAL_SERVER_ERROR"))
                    .rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")).uri("lb://backing-service:8088")
            .build();
    }
    
    es3
    

    这里实际依赖了spring-cloud生态的服务发现组件,注册服务 backing-service 至注册中心,spring-cloud从注册中心获取真实的服务地址host:port,再通过客户端负载均衡lb 路由至真实服务

    这是服务治理的基本原理流程

    但是对目前的场景不适用,目前的场景,基本可以理解为把spring-cloud-gateway 当nginx用

    路由到后端静态的几个地址即可

    以es为例 3个client节点

    es-client-01 192.168.10.11:9200

    es-client-02 192.168.10.12:9200

    es-client-03 192.168.10.13:9200

    并不存在一个有效的注册中心,实际多一个注册中心组件,项目的复杂度就更高了,对es 这类服务组件,并不需要和spring的生态完全结合

    我们需要定制lb://的解析,即,使lb:// 不通过注册中心,而是完全静态配置在本地(先搞静态吧,以后再考虑搞动态,此动态非彼动态)

    需要注意withHealthChecks() 顾名思义 添加了withHealthChecks() 则会对 后端server进行健康检查,检查方式为验证对应的后端server /actuator/health 是否可达。这个路径是spring-boot/spring-cloud 组件的默认路径,但后端的es服务,这个路径并不可达。.withHealthChecks() 会返回Service Unavailable

    {"timestamp":"2021-06-02T09:04:00.596+00:00","path":"/","status":503,"error":"Service Unavailable","requestId":"d3d01e2e-1"}
    
    	@Bean
    	public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
    			ConfigurableApplicationContext context) {
    		return ServiceInstanceListSupplier.builder()
    				.withBase(new CustomServiceInstanceListSupplier())
    //				.withHealthChecks()
    				.build(context);
    	}
    
    

    健康检查部分代码,关键注意/actuator/health

    package org.springframework.cloud.loadbalancer.core;
    
    public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier implements InitializingBean, DisposableBean {
        private static final Log LOG = LogFactory.getLog(HealthCheckServiceInstanceListSupplier.class);
        private final HealthCheck healthCheck;
        private final String defaultHealthCheckPath;
        private final Flux<List<ServiceInstance>> aliveInstancesReplay;
        private Disposable healthCheckDisposable;
        private final BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction;
    
        public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, HealthCheck healthCheck, BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
            super(delegate);
            this.defaultHealthCheckPath = (String)healthCheck.getPath().getOrDefault("default", "/actuator/health");
            this.aliveFunction = aliveFunction;
            this.healthCheck = healthCheck;
            Repeat<Object> aliveInstancesReplayRepeat = Repeat.onlyIf((repeatContext) -> {
                return this.healthCheck.getRefetchInstances();
            }).fixedBackoff(healthCheck.getRefetchInstancesInterval());
            Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate).repeatWhen(aliveInstancesReplayRepeat).switchMap((serviceInstances) -> {
                return this.healthCheckFlux(serviceInstances).map((alive) -> {
                    return Collections.unmodifiableList(new ArrayList(alive));
                });
            });
            this.aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1).refCount(1);
        }
    }    
    

    我们先不采用withHealthChecks,等后期有时间再自定义或配置withHealthChecks实现

    package org.springframework.cloud.loadbalancer.core;
    public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
        public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
        private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
        private Duration timeout = Duration.ofSeconds(30L);
        private final String serviceId;
        private final Flux<List<ServiceInstance>> serviceInstances;
    
        public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
            this.serviceId = environment.getProperty("loadbalancer.client.name");
            this.resolveTimeout(environment);
            this.serviceInstances = Flux.defer(() -> {
                return Flux.just(delegate.getInstances(this.serviceId));
            }).subscribeOn(Schedulers.boundedElastic()).timeout(this.timeout, Flux.defer(() -> {
                this.logTimeout();
                return Flux.just(new ArrayList());
            })).onErrorResume((error) -> {
                this.logException(error);
                return Flux.just(new ArrayList());
            });
        }
    
        public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
            this.serviceId = environment.getProperty("loadbalancer.client.name");
            this.resolveTimeout(environment);
            this.serviceInstances = Flux.defer(() -> {
                return delegate.getInstances(this.serviceId).collectList().flux().timeout(this.timeout, Flux.defer(() -> {
                    this.logTimeout();
                    return Flux.just(new ArrayList());
                })).onErrorResume((error) -> {
                    this.logException(error);
                    return Flux.just(new ArrayList());
                });
            });
        }
    
        public String getServiceId() {
            return this.serviceId;
        }
    
        public Flux<List<ServiceInstance>> get() {
            return this.serviceInstances;
        }
    
        private void resolveTimeout(Environment environment) {
            String providedTimeout = environment.getProperty("spring.cloud.loadbalancer.service-discovery.timeout");
            if (providedTimeout != null) {
                this.timeout = DurationStyle.detectAndParse(providedTimeout);
            }
    
        }
    
        private void logTimeout() {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Timeout occurred while retrieving instances for service %s.The instances could not be retrieved during %s", this.serviceId, this.timeout));
            }
    
        }
    
        private void logException(Throwable error) {
            LOG.error(String.format("Exception occurred while retrieving instances for service %s", this.serviceId), error);
        }
    }
    
    
    
  • 相关阅读:
    java zxing生成二维码
    忘记MySQL root密码重置MySQL root密码
    Java通过ScriptEngine 执行js脚本案例
    两个自定义对象List列表取交集(intersection)
    Java 判断Windows下某个进程是否运行
    Layer文件上传操作
    Jmeter AbstractJavaSamplerClient 案例
    Jaspersoft Studio 导出PDF格式中文不显示
    使用gradle的application插件进行Spring-boot项目打包
    jQuery Ajax应用总结
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14916177.html
Copyright © 2011-2022 走看看