zoukankan      html  css  js  c++  java
  • Spring Cloud-Ribbon实现客户端的服务均衡(三)

    客户端负载均衡与服务端负载均衡

    服务端负载均衡

    通过服务端负载均衡设备维护的服务清单根据算法(轮训 权重负载 流量负载)取出服务地址 进行转发

    客户端负载

    将指定服务的服务清单订单(注册中心)下来 在客户端根据算法取出服务地址进行请求

    Ribbon实现客户端负载均衡

    rabbon是通过代理RestTemplate来实现负载均衡的 只需要在application配置并引入pom文件

    同时还有从注册中心订阅服务的相关配置

        //LoadBalanced 通过代理RestTemplate 实现客户端负载均衡的能力
        @LoadBalanced
        @Bean
        RestTemplate restTemplate(){
            return new RestTemplate();
        }
      <!--ribbon客户端的负载均衡-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
            </dependency>

    RestTemplate使用

    Get请求

      String str=restTemplate.getForEntity("http://PROVIDER/hello",String.class).getBody();
           //url传递参数
            User user =restTemplate.getForEntity("http://PROVIDER/hello/{1}", User.class,1).getBody();
            //多参数
            Map<String,Object> paramters=new HashMap<String,Object>();
            paramters.put("name","小明");
            paramters.put("age",12);
            User user=restTemplate.getForEntity("http://PROVIDER/hello?name={name}&id={id}",User.class,paramters).getBody();

    或者使用getForObject 可以理解成getForEntity的封装

     String str=restTemplate.getForObject("http://PROVIDER/hello",String.class);
            //url传递参数
            User user =restTemplate.getForObject("http://PROVIDER/hello/{1}", User.class,1);
            //多参数
            Map<String,Object> paramters=new HashMap<String,Object>();
            paramters.put("name","小明");
            paramters.put("age",12);
            User user=restTemplate.getForObject("http://PROVIDER/hello?name={name}&id={id}",User.class,paramters);

    Post请求

     User parameter=new User();
           //返回string
            String str=restTemplate.postForEntity("http://PROVIDER/hello",parameter,String.class).getBody();
            Map<String,Object> paramters=new HashMap<String,Object>();
           //url传递参数
            User user =restTemplate.postForEntity("http://PROVIDER/hello/{1}",parameter, User.class,1).getBody();
            //多参数
            Map<String,Object> paramters=new HashMap<String,Object>();
            paramters.put("name","小明");
            paramters.put("age",12);
            User user=restTemplate.postForEntity("http://PROVIDER/hello?name={name}&id={id}",parameter,User.class,paramters).getBody();
    
    
            User parameter=new User();
            //返回string
            String str=restTemplate.postForObject("http://PROVIDER/hello",parameter,String.class);
            Map<String,Object> paramters=new HashMap<String,Object>();
            //url传递参数
            User user =restTemplate.postForObject("http://PROVIDER/hello/{1}",parameter, User.class,1);
            //多参数
            Map<String,Object> paramters=new HashMap<String,Object>();
            paramters.put("name","小明");
            paramters.put("age",12);
            User user=restTemplate.postForObject("http://PROVIDER/hello?name={name}&id={id}",parameter,User.class,paramters);

    Put Delete

    与上面类似

    ribbon负载均衡实现原理

    LoadBalancerClient是rabbon的一个重要接口

    execute 从负载均衡器中挑选一个实例执行请求

    reconstructURL负责重构url restTemplate.getForEntity("http://PROVIDER/hello",String.class) 将PROVIDER转换为 host:port形式

    choose从负载均衡器中选择一个实例

    LoadBalancerAutoConfiguration

    ribbon自动化配置类

    @Configuration
    @ConditionalOnClass(RestTemplate.class) //RestTemplate 必须存在当前环境中
    @ConditionalOnBean(LoadBalancerClient.class)//LoadBalancerClient 必须存在当前环境中
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
        /**
         * 维护一个被 @LoadBalanced 标识的 RestTemplate对象 通过RestTemplateCustomizer增加LoadBalancerInterceptor拦截器
         * @param restTemplateCustomizers
         * @return
         */
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        customizer.customize(restTemplate);
                    }
                }
            });
        }
        .......
        @Configuration
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
            /**
             * LoadBalancerInterceptor 用于实现对客户端请求进行拦截 实现负载均衡
             * @param loadBalancerClient
             * @param requestFactory
             * @return
             */
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
            /**
             * 用于对RestTemplate增加 LoadBalancerInterceptor拦截器
             * @param loadBalancerInterceptor
             * @return
             */
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
        }
    
    }

    自动化配置类主要做以下几件事

    1.LoadBalancerInterceptor 初始化一个拦截器对象 主要用于拦截器RestTemplate请求 实现负载均衡

    2.初始化RestTemplateCustomizer 用于给RestTemplate增加LoadBalancerInterceptor拦截器

    3.通过loadBalancedRestTemplateInitializerDeprecated 加载@LoadBalanced 标识的 RestTemplate对象 通过RestTemplateCustomizer增加LoadBalancerInterceptor拦截器

    LoadBalancerinterceptor类

    /**
     * 拦截通过LoadBalancer注解修饰的Restempate的请求信息
     */
    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
        /**
         * 对restempate请求进行拦截
         * @param request
         * @param body
         * @param execution
         * @return
         * @throws IOException
         */
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                            final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
    //获得服务名字 String serviceName
    = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); //通过注入LoadBalancerClient 实现负载均衡 requset封装了restrempate的请求信息 return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }

    LoadBalancerinterceptor拦截@LoadBalanced 标识的 RestTemplate对象发送的请求 并交给LoadBalancerClient的实现类的execute方法处理

    RibbonLoadBalancerClient

    public class RibbonLoadBalancerClient implements LoadBalancerClient {
    
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            //serviceId此时已经是服务名字如前面demoPROVIDER
            return this.execute(serviceId, (LoadBalancerRequest)request, (Object)null);
        }
    
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
            ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
            //通过ILoadBalancer 根据ServiceId获得具体服务实例
            Server server = this.getServer(loadBalancer, hint);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
                return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
            }
        }
        protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
            //根据loadBalancer 获得服务实例
            return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
        }
        protected ILoadBalancer getLoadBalancer(String serviceId) {
            return this.clientFactory.getLoadBalancer(serviceId);
        }
        
    
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
                server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
            }
    
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
                RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
                try {
                    T returnVal = request.apply(serviceInstance);
                    statsRecorder.recordStats(returnVal);
                    return returnVal;
                } catch (IOException var8) {
                    statsRecorder.recordStats(var8);
                    throw var8;
                } catch (Exception var9) {
                    statsRecorder.recordStats(var9);
                    ReflectionUtils.rethrowRuntimeException(var9);
                    return null;
                }
            }
        }
    }

    最终是通过loadBalancer.chooseServe(serverId) 来获得具体实例 而不是

    ILoadBalancer

    public interface ILoadBalancer {
        //向负载均衡器中维护的实例列表增加服务实例。
        public void addServers(List<Server> newServers);
        //通过某种策略, 从负载均衡器中挑选出 一 个具体的服务实例
        public Server chooseServer(Object key);
        //用来通知和标识负载均衡器中某个具体实例已经停止服务, 不
        //然负载均衡器在下 一 次获取服务实例清单前都会认为服务实例均是正常服务的。
        public void markServerDown(Server server);
    
        // 获取服务列表 已过期 已过时
        @Deprecated
        public List<Server> getServerList(boolean availableOnly);
        //获取当前正常服务的实例列表。
        public List<Server> getReachableServers();
        //获取所有已知的服务实例列表, 包括正常服务和停止服务的实例
        public List<Server> getAllServers();
    }

     BaseLoadBalancer对负载均衡做了基本的实现

    DynamicServerListLoadBalancer继承BaseLoadBalanceer 做了扩展

    ZoneAwareLoadBalancer继承DynamicServerListLoadBalancer 在原有的基础上做了扩展

    @Configuration
    @EnableConfigurationProperties
    @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
    public class RibbonClientConfiguration {
      
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
        }
    
      
    }

    通过RibbonClientConfiguration 可以看出默认是使用ZoneAwareLoadBalancer

    根据ZoneAwareLoadBalancer负载均衡策略获得server后 继续调用RibbonLoadBalancerClient.execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)

     public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
                server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
            }
    
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
                RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
                try {
                    //调用拦截器传入LoadBalancerRequest回调apply 这个时候serviceInstance封装了host信息端口等信息
                    T returnVal = request.apply(serviceInstance);
                    statsRecorder.recordStats(returnVal);
                    return returnVal;
                } catch (IOException var8) {
                    statsRecorder.recordStats(var8);
                    throw var8;
                } catch (Exception var9) {
                    statsRecorder.recordStats(var9);
                    ReflectionUtils.rethrowRuntimeException(var9);
                    return null;
                }
            }
        }

    serviceInstance接口定义

    public interface ServiceInstance {
        default String getInstanceId() {
            return null;
        }
    
        String getServiceId();
    
        String getHost();
    
        int getPort();
    
        boolean isSecure();
    
        URI getUri();
    
        Map<String, String> getMetadata();
    
        default String getScheme() {
            return null;
        }
    }

    当拿到实例最终会根据RibbonLoadBalancerClient.reconstructURI 将服务组织正常url形式并发起请求

     

  • 相关阅读:
    .net core 3.1 过滤器(Filter) 和中间件和AOP面向切面拦截器
    socket通信框架——boost asio
    远程过程调用框架——gRPC
    数据序列化工具——flatbuffer
    springboot项目启动----8080端口被占用排雷经过
    如何配置HOSTS文件
    使用线程Callable实现分段获取一个url连接的资源数据
    Socket网络编程课件
    (6)优化TCP编写 服务器端同时支持多个客户端同时访问
    SpringBoot配置属性之Security
  • 原文地址:https://www.cnblogs.com/LQBlog/p/10077972.html
Copyright © 2011-2022 走看看