zoukankan      html  css  js  c++  java
  • 【一起学源码-微服务】Feign 源码三:Feign结合Ribbon实现负载均衡的原理分析

    前言

    前情回顾

    上一讲我们已经知道了Feign的工作原理其实是在项目启动的时候,通过JDK动态代理为每个FeignClinent生成一个动态代理。

    动态代理的数据结构是:ReflectiveFeign.FeignInvocationHandler。其中包含target(里面是serviceName等信息)和dispatcher(map数据结构,key是请求的方法名,方法参数等,value是SynchronousMethodHandler)。

    如下图所示:

    image.png

    本讲目录

    这一讲主要是Feign与Ribbon结合实现负载均衡的原理分析。

    说明

    原创不易,如若转载 请标明来源!

    博客地址:一枝花算不算浪漫
    微信公众号:壹枝花算不算浪漫

    源码分析

    Feign结合Ribbon实现负载均衡原理

    通过前面的分析,我们可以直接来看下SynchronousMethodHandler中的代码:

    final class SynchronousMethodHandler implements MethodHandler {
    
        @Override
        public Object invoke(Object[] argv) throws Throwable {
            // 生成请求类似于:GET /sayHello/wangmeng HTTP/1.1
            RequestTemplate template = buildTemplateFromArgs.create(argv);
            Retryer retryer = this.retryer.clone();
            while (true) {
                try {
                    return executeAndDecode(template);
                } catch (RetryableException e) {
                    retryer.continueOrPropagate(e);
                    if (logLevel != Logger.Level.NONE) {
                        logger.logRetry(metadata.configKey(), logLevel);
                    }
                    continue;
                }
            }
        }
    
        Object executeAndDecode(RequestTemplate template) throws Throwable {
            // 构建request对象:GET http://serviceA/sayHello/wangmeng HTTP/1.1
            Request request = targetRequest(template);
    
            if (logLevel != Logger.Level.NONE) {
                logger.logRequest(metadata.configKey(), logLevel, request);
            }
    
            Response response;
            long start = System.nanoTime();
            try {
                // 这个client就是之前构建的LoadBalancerFeignClient,options是超时时间
                response = client.execute(request, options);
                // ensure the request is set. TODO: remove in Feign 10
                response.toBuilder().request(request).build();
            } catch (IOException e) {
                if (logLevel != Logger.Level.NONE) {
                    logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
                }
                throw errorExecuting(request, e);
            }
            long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    
            // 下面逻辑都是构建返回值response
            boolean shouldClose = true;
            try {
                if (logLevel != Logger.Level.NONE) {
                    response =
                            logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
                    // ensure the request is set. TODO: remove in Feign 10
                    response.toBuilder().request(request).build();
                }
                if (Response.class == metadata.returnType()) {
                    if (response.body() == null) {
                        return response;
                    }
                    if (response.body().length() == null ||
                            response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
                        shouldClose = false;
                        return response;
                    }
                    // Ensure the response body is disconnected
                    byte[] bodyData = Util.toByteArray(response.body().asInputStream());
                    return response.toBuilder().body(bodyData).build();
                }
                if (response.status() >= 200 && response.status() < 300) {
                    if (void.class == metadata.returnType()) {
                        return null;
                    } else {
                        return decode(response);
                    }
                } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
                    return decode(response);
                } else {
                    throw errorDecoder.decode(metadata.configKey(), response);
                }
            } catch (IOException e) {
                if (logLevel != Logger.Level.NONE) {
                    logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
                }
                throw errorReading(request, response, e);
            } finally {
                if (shouldClose) {
                    ensureClosed(response.body());
                }
            }
        }
    }
    

    这里主要是构建request数据,然后通过request和options去通过LoadBalancerFeignClient.execute()方法去获得返回值。我们可以接着看client端的调用:

    public class LoadBalancerFeignClient implements Client {
    
        @Override
        public Response execute(Request request, Request.Options options) throws IOException {
            try {
                // asUri: http://serviceA/sayHello/wangmeng
                URI asUri = URI.create(request.url());
    
                // clientName:serviceA
                String clientName = asUri.getHost();
    
                // uriWithoutHost: http://sayHello/wangmeng
                URI uriWithoutHost = cleanUrl(request.url(), clientName);
    
                // 这里ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1  
                FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                        this.delegate, request, uriWithoutHost);
    
                // 这里面config只有两个超时时间,一个是connectTimeout:5000,一个是readTimeout:5000
                IClientConfig requestConfig = getClientConfig(options, clientName);
    
                // 真正执行负载均衡的地方
                return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                        requestConfig).toResponse();
            }
            catch (ClientException e) {
                IOException io = findIOException(e);
                if (io != null) {
                    throw io;
                }
                throw new RuntimeException(e);
            }
        }
    }
    

    接着我们看下lbClient()executeWithLoadBalancer()

    public class LoadBalancerFeignClient implements Client {
    
        private FeignLoadBalancer lbClient(String clientName) {
            return this.lbClientFactory.create(clientName);
        }
    }
    
    public class CachingSpringLoadBalancerFactory {
        public FeignLoadBalancer create(String clientName) {
            if (this.cache.containsKey(clientName)) {
                return this.cache.get(clientName);
            }
            IClientConfig config = this.factory.getClientConfig(clientName);
            // 获取Ribbon ILoadBalancer信息
            ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
            ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
            FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
                loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
            this.cache.put(clientName, client);
            return client;
        }
    }
    

    这里是获取了ILoadBalancer数据,里面包含了Ribbon获取的serviceA所有服务节点信息。

    image.png

    这里已经获取到ILoadBalancer,里面包含serviceA服务器所有节点请求host信息。接下来就是从中负载均衡选择一个节点信息host出来。

    public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
    
        public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
            LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    
            try {
                return command.submit(
                    new ServerOperation<T>() {
                        @Override
                        public Observable<T> call(Server server) {
                            URI finalUri = reconstructURIWithServer(server, request.getUri());
                            S requestForServer = (S) request.replaceUri(finalUri);
                            try {
                                return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                            } 
                            catch (Exception e) {
                                return Observable.error(e);
                            }
                        }
                    })
                    .toBlocking()
                    .single();
            } catch (Exception e) {
                Throwable t = e.getCause();
                if (t instanceof ClientException) {
                    throw (ClientException) t;
                } else {
                    throw new ClientException(e);
                }
            }
            
        }
    }
    
    public class LoadBalancerCommand<T> {
        
        public Observable<T> submit(final ServerOperation<T> operation) {
            final ExecutionInfoContext context = new ExecutionInfoContext();
            
            if (listenerInvoker != null) {
                try {
                    listenerInvoker.onExecutionStart();
                } catch (AbortExecutionException e) {
                    return Observable.error(e);
                }
            }
    
            final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
            final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    
            // Use the load balancer
            Observable<T> o = 
                    (server == null ? selectServer() : Observable.just(server))
                    .concatMap(new Func1<Server, Observable<T>>() {
                    }
    
            // 省略代码...
    
        // selectServer是真正执行负载均衡的逻辑
        private Observable<Server> selectServer() {
            return Observable.create(new OnSubscribe<Server>() {
                @Override
                public void call(Subscriber<? super Server> next) {
                    try {
                        // loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey为null
                        Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                        next.onNext(server);
                        next.onCompleted();
                    } catch (Exception e) {
                        next.onError(e);
                    }
                }
            });
        }
    }
    
    public class LoadBalancerContext implements IClientConfigAware {
    
        public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
            String host = null;
            int port = -1;
            if (original != null) {
                host = original.getHost();
            }
            if (original != null) {
                Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
                port = schemeAndPort.second();
            }
    
            // 获取到ILoadBalancer,这里面有IRule的信息及服务节点所有信息
            ILoadBalancer lb = getLoadBalancer();
            if (host == null) {
                // Partial URI or no URI Case
                // well we have to just get the right instances from lb - or we fall back
                if (lb != null){
                    // 这里就执行真正的chooseServer的逻辑了。默认的rule为ZoneAvoidanceZule
                    Server svc = lb.chooseServer(loadBalancerKey);
                    if (svc == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Load balancer does not have available server for client: "
                                        + clientName);
                    }
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                    return svc;
                }
    
                // 省略代码
            }
        }
    }
    

    上面代码已经很清晰了,这里就是真正的通过ribbon的 rule.chooseServer()负载均衡地选择了一个服务节点调用,debug如下:

    image.png

    到了这里feign与ribbon的分析也就结束了,返回请求url信息,然后得到response结果:

    image.png

    总结

    上面已经分析了Feign与Ribbon的整合,最终还是落到Ribbon中的ILoadBalancer中,使用最后使用IRule去选择对应的server数据。

    下一讲 会画一个很大的图,包含Feign、Ribbon、Eureka关联的图,里面会画出每个组件的细节及依赖关系。也算是学习至今的一个总复习了。

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg

  • 相关阅读:
    拓端tecdat|R语言投资组合优化求解器:条件约束最优化、非线性规划求解
    拓端tecdat|R语言多元时间序列滚动预测:ARIMA、回归、ARIMAX模型分析
    拓端tecdat|R语言聚类有效性:确定最优聚类数分析IRIS鸢尾花数据和可视化
    拓端tecdat|R语言k-means聚类、层次聚类、主成分(PCA)降维及可视化分析鸢尾花iris数据集
    【拓端tecdat】R语言用Hessian-free 、Nelder-Mead优化方法对数据进行参数估计
    springcloud之zuul网关服务并携带头信息转发token
    windows环境搭建Vue开发环境
    JVM之top+jstack分析cpu过高原因
    JVM调优之jstack找出最耗cpu的线程并定位代码
    用自顶向下、逐步细化的方法进行以下算法的设计: 1. 输出1900---2000年中是软黏的年份,符合下面两个条件之一的年份是闰年:
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12181895.html
Copyright © 2011-2022 走看看