zoukankan      html  css  js  c++  java
  • Spring Cloud Feign组件

    采用Spring Cloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件。

    由于之前有使用dubbo经验。dubbo的负载均衡策略(轮训、最小连接数、随机轮训、加权轮训),dubbo失败策略(快速失败、失败重试等等),

    所以Feign负载均衡策略的是什么? 失败后是否会重试,重试策略又是什么?带这个疑问,查了一些资料,最后还是看了下代码。毕竟代码就是一切

    Spring boot集成Feign的大概流程:

    1、利用FeignAutoConfiguration自动配置。并根据EnableFeignClients 自动注册产生Feign的代理类。

    2、注册方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean 产生bean的工厂,有个重要方法getObject产生FeignClient容器bean

    3、同时代理类中使用hystrix做资源隔离,Feign代理类中 构造 RequestTemplate ,RequestTemlate要做的向负载均衡选中的server发送http请求,并进行编码和解码一系列操作。

    下面只是粗略的看了下整体流程,先有整体再有细节吧,下面利用IDEA看下细节:

    一、Feign失败重试

    SynchronousMethodHandler的方法中的处理逻辑
     @Override
      public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
          try {
            return executeAndDecode(template);
          } catch (RetryableException e) {
            retryer.continueOrPropagate(e);
            if (logLevel != Logger.Level.NONE) {
              logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
          }
        }
      }
    
    •  上面的逻辑很简单。构造 template 并去进行服务间的http调用,然后对返回结果进行解码
    •      当抛出 RetryableException 后,异常逻辑是否重试? 重试多少次? 带这个问题,看了retryer.continueOrPropagate(e);
    具体逻辑如下:
    public void continueOrPropagate(RetryableException e) {
          if (attempt++ >= maxAttempts) {
            throw e;
          }
    
          long interval;
          if (e.retryAfter() != null) {
            interval = e.retryAfter().getTime() - currentTimeMillis();
            if (interval > maxPeriod) {
              interval = maxPeriod;
            }
            if (interval < 0) {
              return;
            }
          } else {
            interval = nextMaxInterval();
          }
          try {
            Thread.sleep(interval);
          } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
          }
          sleptForMillis += interval;
        }
    
    •   当重试次数大于默认次数5时候,直接抛出异常,不在重试
    •        否则每隔一段时间 默认值最大1ms 后重试一次。

         这就Feign这块的重试这块的粗略逻辑,由于之前工作中一直使用dubbo。同样是否需要将生产环境中重试操作关闭?

         思考:之前dubbo生产环境的重试操作都会关闭。原因有几个:

             一、一般第一次失败,重试也会失败,极端情况下不断的重试,会占用大量dubbo连接池,造成连接池被打满,影响核心功能

            二、也是比较重要的一点原因,重试带来的业务逻辑的影响,即如果接口不是幂等的,重试会带来业务逻辑的错误,引发问题

         

    二、Feign负载均衡策略

    那么负载均衡的策略又是什么呢?由上图中可知 executeAndDecode(template

     1 Object executeAndDecode(RequestTemplate template) throws Throwable {
     2     Request request = targetRequest(template);
     3 
     4     if (logLevel != Logger.Level.NONE) {
     5       logger.logRequest(metadata.configKey(), logLevel, request);
     6     }
     7 
     8     Response response;
     9     long start = System.nanoTime();
    10     try {
    11       response = client.execute(request, options);
    12       // ensure the request is set. TODO: remove in Feign 10
    13       response.toBuilder().request(request).build();
    14     } catch (IOException e) {
    15       if (logLevel != Logger.Level.NONE) {
    16         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
    17       }
    18       throw errorExecuting(request, e);
    19     }
    20     long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    21 
    22     boolean shouldClose = true;
    23     try {
    24       if (logLevel != Logger.Level.NONE) {
    25         response =
    26             logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
    27         // ensure the request is set. TODO: remove in Feign 10
    28         response.toBuilder().request(request).build();
    29       }
    30       if (Response.class == metadata.returnType()) {
    31         if (response.body() == null) {
    32           return response;
    33         }
    34         if (response.body().length() == null ||
    35                 response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
    36           shouldClose = false;
    37           return response;
    38         }
    39         // Ensure the response body is disconnected
    40         byte[] bodyData = Util.toByteArray(response.body().asInputStream());
    41         return response.toBuilder().body(bodyData).build();
    42       }
    43       if (response.status() >= 200 && response.status() < 300) {
    44         if (void.class == metadata.returnType()) {
    45           return null;
    46         } else {
    47           return decode(response);
    48         }
    49       } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
    50         return decode(response);
    51       } else {
    52         throw errorDecoder.decode(metadata.configKey(), response);
    53       }
    54     } catch (IOException e) {
    55       if (logLevel != Logger.Level.NONE) {
    56         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
    57       }
    58       throw errorReading(request, response, e);
    59     } finally {
    60       if (shouldClose) {
    61         ensureClosed(response.body());
    62       }
    63     }
    64   }

    概括的说主要做了两件事:发送HTTP请求,解码响应数据

    想看的负载均衡应该在11行  response = client.execute(request, options); 而client的实现方式有两种 Default、LoadBalancerFeignClient

    猜的话应该是LoadBalancerFeignClient,带这个问题去看源码(其实个人更喜欢带着问题看源码,没有目的一是看很难将复杂的源码关联起来,二是很容易迷失其中

    果然通过一番查找发现 Client 实例就是LoadBalancerFeignClient,而设置这个Client就是通过上面说的FeignClientFactoryBean的getObject方法中设置的,具体不说了

    下面重点看LoadBalancerFeignClient execute(request, options)

     1 @Override
     2     public Response execute(Request request, Request.Options options) throws IOException {
     3         try {
     4             URI asUri = URI.create(request.url());
     5             String clientName = asUri.getHost();
     6             URI uriWithoutHost = cleanUrl(request.url(), clientName);
     7             FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
     8                     this.delegate, request, uriWithoutHost);
     9 
    10             IClientConfig requestConfig = getClientConfig(options, clientName);
    11             return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
    12                     requestConfig).toResponse();
    13         }
    14         catch (ClientException e) {
    15             IOException io = findIOException(e);
    16             if (io != null) {
    17                 throw io;
    18             }
    19             throw new RuntimeException(e);
    20         }
    21     }

    通过几行代码比较重要的点RibbonRequest ,原来Feign负载均衡还是通过Ribbon实现的,那么Ribbo又是如何实现负载均衡的呢? 

      1 public Observable<T> submit(final ServerOperation<T> operation) {
      2         final ExecutionInfoContext context = new ExecutionInfoContext();
      3         
      4         if (listenerInvoker != null) {
      5             try {
      6                 listenerInvoker.onExecutionStart();
      7             } catch (AbortExecutionException e) {
      8                 return Observable.error(e);
      9             }
     10         }
     11 
     12         final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
     13         final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
     14 
     15         // Use the load balancer
     16         Observable<T> o = 
     17                 (server == null ? selectServer() : Observable.just(server))
     18                 .concatMap(new Func1<Server, Observable<T>>() {
     19                     @Override
     20                     // Called for each server being selected
     21                     public Observable<T> call(Server server) {
     22                         context.setServer(server);
     23                         final ServerStats stats = loadBalancerContext.getServerStats(server);
     24                         
     25                         // Called for each attempt and retry
     26                         Observable<T> o = Observable
     27                                 .just(server)
     28                                 .concatMap(new Func1<Server, Observable<T>>() {
     29                                     @Override
     30                                     public Observable<T> call(final Server server) {
     31                                         context.incAttemptCount();
     32                                         loadBalancerContext.noteOpenConnection(stats);
     33                                         
     34                                         if (listenerInvoker != null) {
     35                                             try {
     36                                                 listenerInvoker.onStartWithServer(context.toExecutionInfo());
     37                                             } catch (AbortExecutionException e) {
     38                                                 return Observable.error(e);
     39                                             }
     40                                         }
     41                                         
     42                                         final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
     43                                         
     44                                         return operation.call(server).doOnEach(new Observer<T>() {
     45                                             private T entity;
     46                                             @Override
     47                                             public void onCompleted() {
     48                                                 recordStats(tracer, stats, entity, null);
     49                                                 // TODO: What to do if onNext or onError are never called?
     50                                             }
     51 
     52                                             @Override
     53                                             public void onError(Throwable e) {
     54                                                 recordStats(tracer, stats, null, e);
     55                                                 logger.debug("Got error {} when executed on server {}", e, server);
     56                                                 if (listenerInvoker != null) {
     57                                                     listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
     58                                                 }
     59                                             }
     60 
     61                                             @Override
     62                                             public void onNext(T entity) {
     63                                                 this.entity = entity;
     64                                                 if (listenerInvoker != null) {
     65                                                     listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
     66                                                 }
     67                                             }                            
     68                                             
     69                                             private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
     70                                                 tracer.stop();
     71                                                 loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
     72                                             }
     73                                         });
     74                                     }
     75                                 });
     76                         
     77                         if (maxRetrysSame > 0) 
     78                             o = o.retry(retryPolicy(maxRetrysSame, true));
     79                         return o;
     80                     }
     81                 });
     82             
     83         if (maxRetrysNext > 0 && server == null) 
     84             o = o.retry(retryPolicy(maxRetrysNext, false));
     85         
     86         return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
     87             @Override
     88             public Observable<T> call(Throwable e) {
     89                 if (context.getAttemptCount() > 0) {
     90                     if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
     91                         e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
     92                                 "Number of retries on next server exceeded max " + maxRetrysNext
     93                                 + " retries, while making a call for: " + context.getServer(), e);
     94                     }
     95                     else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
     96                         e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
     97                                 "Number of retries exceeded max " + maxRetrysSame
     98                                 + " retries, while making a call for: " + context.getServer(), e);
     99                     }
    100                 }
    101                 if (listenerInvoker != null) {
    102                     listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
    103                 }
    104                 return Observable.error(e);
    105             }
    106         });
    107     }

    通过上面代码分析,发现Ribbon和Hystrix一样都是利用了rxjava看来有必要掌握下rxjava了又。这里面 比较重要的就是17行,

    selectServer() 方法选择指定的Server,负载均衡的策略主要是有ILoadBalancer接口不同实现方式:

    BaseLoadBalancer采用的规则为RoundRobinRule 轮训规则
    DynamicServerListLoadBalancer继承了BaseLoadBalancer,主要运行时改变Server列表
    NoOpLoadBalancer 什么操作都不做
    ZoneAwareLoadBalancer 功能主要是根据区域Zone分组的实例列表

         

  • 相关阅读:
    python网页抓取之英汉字典
    快速搭建建SSH服务
    dos文件批量转换成unix文件
    svn强制提交备注信息
    win7/8下VirtualBox虚拟共享文件夹设置
    CentOS SVN服务器安装配置小记
    CentOS中vsftp安装与配置
    sql执行顺序
    PHP最佳实践(译)
    python连接mysql数据库
  • 原文地址:https://www.cnblogs.com/mxmbk/p/9417389.html
Copyright © 2011-2022 走看看