zoukankan      html  css  js  c++  java
  • Feign和Ribbon的重试机制

    Feign和Ribbon的重试机制

    Ribbon的重试机制

    配置

    ribbon:
      ReadTimeout: 2000
      ConnectTimeout: 2000
      MaxAutoRetries: 1 #同一台实例最大重试次数,不包括首次调用
      MaxAutoRetriesNextServer: 3 #重试负载均衡其他的实例最大重试次数,不包括首次调用
      OkToRetryOnAllOperations: true  #是否所有操作都重试
    

    结果

    image-20211022135632700

    image-20211022135732533

    可以看到2台机器分别被请求了四次,而且每个时间的间隔大约为2秒左右。一开始请求的是8083接口,请求的次数为8083,8083,8081,8081,8083,8083,8081,8083。

    那它的计算公式为:(MaxAutoRetries+ 1)*(MaxAutoRetriesNextServer+1) =8次

    源码分析

    首先由于在配置文件中配置了超时时间这些信息,一开始注入bean的时候,会读取这些值,覆盖默认值,在HttpClientRibbonConfiguration中

    	@Bean
    	@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    	public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
    			IClientConfig config, ServerIntrospector serverIntrospector,
    			ILoadBalancer loadBalancer, RetryHandler retryHandler,
    			CloseableHttpClient httpClient) {
    		RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
    				httpClient, config, serverIntrospector);
    		client.setLoadBalancer(loadBalancer);
    		client.setRetryHandler(retryHandler);
    		Monitors.registerObject("Client_" + this.name, client);
    		return client;
    	}
    
    	public RibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
    			IClientConfig config, ServerIntrospector serverIntrospector) {
    		super(delegate, config, serverIntrospector);
    	}
    
    	protected AbstractLoadBalancingClient(D delegate, IClientConfig config,
    			ServerIntrospector serverIntrospector) {
    		super(null);
    		this.delegate = delegate;
    		this.config = config;
    		this.serverIntrospector = serverIntrospector;
    		this.setRetryHandler(RetryHandler.DEFAULT);
    		initWithNiwsConfig(config);
    	}
    
    	@Override
    	public void initWithNiwsConfig(IClientConfig clientConfig) {
    		super.initWithNiwsConfig(clientConfig);
    		RibbonProperties ribbon = RibbonProperties.from(clientConfig);
    		this.connectTimeout = ribbon.connectTimeout(DEFAULT_CONNECT_TIMEOUT);
    		this.readTimeout = ribbon.readTimeout(DEFAULT_READ_TIMEOUT);
    		this.secure = ribbon.isSecure();
    		this.followRedirects = ribbon.isFollowRedirects();
    		this.okToRetryOnAllOperations = ribbon.isOkToRetryOnAllOperations();
    		this.gzipPayload = ribbon.isGZipPayload(DEFAULT_GZIP_PAYLOAD);
    	}
    

    当方法开始执行的时候,由于前面已经断点过此处就直接放行,一直来到LoadBalancerFeignClient

    	@Override
    	public Response execute(Request request, Request.Options options) throws IOException {
    		try {
    			URI asUri = URI.create(request.url());
    			String clientName = asUri.getHost();
    			URI uriWithoutHost = cleanUrl(request.url(), clientName);
    			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
    					this.delegate, request, uriWithoutHost);
    
    			IClientConfig requestConfig = getClientConfig(options, clientName);
    			return lbClient(clientName)
    					.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
    		}
    		catch (ClientException e) {
    			IOException io = findIOException(e);
    			if (io != null) {
    				throw io;
    			}
    			throw new RuntimeException(e);
    		}
    	}
    

    在进入到executeWithLoadBalancer的方法中。

        public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
            LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    
            try {
                return command.submit(
                    new ServerOperation<T>() {
                        @Override
                        public Observable<T> call(Server server) {
                            URI finalUri = reconstructURIWithServer(server, request.getUri());
                            S requestForServer = (S) request.replaceUri(finalUri);
                            try {
                                return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                            } 
                            catch (Exception e) {
                                return Observable.error(e);
                            }
                        }
                    })
                    .toBlocking()
                    .single();
            } catch (Exception e) {
                Throwable t = e.getCause();
                if (t instanceof ClientException) {
                    throw (ClientException) t;
                } else {
                    throw new ClientException(e);
                }
            }
            
        }
    

    executeWithLoadBalancer方法中会创建一个LoadBalancerCommand,然后调用LoadBalancerCommandsubmit方法提交请求Operation,submit方法源码如下(有删减):

    
       public Observable<T> submit(final ServerOperation<T> operation) {
            // .......
            // &emsp;获取重试次数
            final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
            final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
            // Use the load balancer
            Observable<T> o = (server == null ? selectServer() : Observable.just(server))
                    .concatMap(new Func1<Server, Observable<T>>() {
                        @Override
                        public Observable<T> call(Server server) {
                            //.......
                            // 相同节点的重试
                            if (maxRetrysSame > 0)
                                o = o.retry(retryPolicy(maxRetrysSame, true));
                            return o;
                        }
                    });
            // 不同节点的重试
            if (maxRetrysNext > 0 && server == null)
                o = o.retry(retryPolicy(maxRetrysNext, false));
            return o.onErrorResumeNext(...);
    

    submit方法中调用retryHandler的getMaxRetriesOnSameServer方法和getMaxRetriesOnNextServer方法分别获取配置maxRetrysSame、maxRetrysNext。maxRetrysSame表示调用相同节点的重试次数,默认为0;maxRetrysNext表示调用不同节点的重试次数,默认为1。

    retryPolicy方法返回的是一个包装RetryHandler重试决策者的RxJava API的对象,最终由该RetryHandler决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在retryPolicy返回的Func2中完成,这是RxJava的API,retryPolicy方法的源码如下。

    
    private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
        return new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer tryCount, Throwable e) {
                if (e instanceof AbortExecutionException) {
                    return false;
                }
                // 大于最大重试次数
                if (tryCount > maxRetrys) {
                    return false;
                }
                if (e.getCause() != null && e instanceof RuntimeException) {
                    e = e.getCause();
                }
                // 调用RetryHandler判断是否重试
                return retryHandler.isRetriableException(e, same);
            }
        };
    
    
    

    那么这个retryHandler是怎么来的呢?

    FeignLoadBalancer的executeWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象时创建的,buildLoadBalancerCommand方法源码如下。

        protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
            // 获取RetryHandler
    		RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
    		// 使用Builder构造者模式构造LoadBalancerCommand
    		LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
    				.withLoadBalancerContext(this)
    				// 传入RetryHandler
    				.withRetryHandler(handler)
    				.withLoadBalancerURI(request.getUri());
    		return builder.build();
    	}
    

    从源码中可以看出,Ribbon使用的RetryHandlerRequestSpecificRetryHandler。这里还用到了Builder构造者模式。FeignLoadBalancergetRequestSpecificRetryHandler方法源码如下:

    
    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
    	RibbonRequest request, IClientConfig requestConfig) {
    	//.....
    	if (!request.toRequest().httpMethod().name().equals("GET")) {
    	    // 调用this.getRetryHandler()方法获取一次RetryHandler
    		return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
    				requestConfig);
    	}
    	else {
    	    // 调用this.getRetryHandler()方法获取一次RetryHandler
    		return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
    				requestConfig);
    	}
    

    Feign的重试机制

    因为ribbon的重试机制和Feign的重试机制有冲突,所以源码中默认关闭Feign的重试机制,源码如下

    image-20211022162035037

    配置

    feign:
      client:
        config:
          ServiceA:
            connect-timeout: 3000
            read-timeout: 3000
                
          @Bean
        public Retryer feignRetryer() {
            return new Retryer.Default(100, SECONDS.toMillis(1), 7);
        }          
    

    结果

    image-20211022135311827 image-20211022135321589

    会发现2台机器是轮休的访问,时间超过3秒多一点,是因为期间有一个休眠,而且feign的超时和ribbon的超时是不一样的,而且如果同时都配置的话,生效的只是Feign,那下面就分析一下看看。

    源码分析

    SynchronousMethodHandler的反射方法

      @Override
      public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Options options = findOptions(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
          try {
            return executeAndDecode(template, options);
          } catch (RetryableException e) {
            try {
              retryer.continueOrPropagate(e);
            } catch (RetryableException th) {
              Throwable cause = th.getCause();
              if (propagationPolicy == UNWRAP && cause != null) {
                throw cause;
              } else {
                throw th;
              }
            }
            if (logLevel != Logger.Level.NONE) {
              logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
          }
        }
      }
    

    在它的方法里面下面就是核心的尝试机制,前面嵌套一层循环。此处的retryer已经拿到了配置的重试参数。

    image-20211022163859408

    后面会进入到下面的这个方法中,这里就是核心,会创建一个handler,由于创建的类型不是默认的,会创建FeignOptionsClientConfig,导致之前配置的ribbon的会失效,这样它重试的次数和最大的次数都是-1,int值的时候会变成0,就会走feign的重试机制

    image-20211022164106314

    image-20211022164605220

    image-20211022164129720

    如果超时的会就会来到一开始的方法,被catch中,核心代码如下:

    image-20211022112347724

        public void continueOrPropagate(RetryableException e) {
           //如果重试机制大于最大的尝试次数,则抛出异常
          if (attempt++ >= maxAttempts) {
            throw e;
          }
    
          long interval;
          if (e.retryAfter() != null) {
              //根据上次重试的时间减去当前时间来决定重试的间隔
            interval = e.retryAfter().getTime() - currentTimeMillis();
            if (interval > maxPeriod) {
              interval = maxPeriod;
            }
            if (interval < 0) {
              return;
            }
          } else {
            interval = nextMaxInterval();
          }
          try {
            Thread.sleep(interval);
          } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
            throw e;
          }
          sleptForMillis += interval;
        }
    

    image-20211022112402476

  • 相关阅读:
    XP和Scrum的比较
    让PowerDesigner支持SQLite 3.0[转]
    对于大型公司项目平台选择j2ee的几层认识(一)
    对于大型公司项目平台选择j2ee的几层认识(三)
    Alpha、Beta、RC、GA版本的区别
    如何在XElement中使用XPath
    对于大型公司项目平台选择j2ee的几层认识(四)
    Thread与BeginInvoke
    对于大型公司项目平台选择j2ee的几层认识(二)
    修复VS 2010的Help Library管理器
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15439861.html
Copyright © 2011-2022 走看看