zoukankan      html  css  js  c++  java
  • Spring Cloud 请求重试机制核心代码分析

    场景

    发布微服务的操作一般都是打完新代码的包,kill掉在跑的应用,替换新的包,启动。
    spring cloud 中使用eureka为注册中心,它是允许服务列表数据的延迟性的,就是说即使应用已经不在服务列表了,客户端在一段时间内依然会请求这个地址。那么就会出现请求正在发布的地址,而导致失败。
    我们会优化服务列表的刷新时间,以提高服务列表信息的时效性。但是无论怎样,都无法避免有那么一段时间是数据不一致的。
    所以我们想到一个办法就是重试机制,当a机子在重启时,同个集群的b是可以正常提供服务的,如果有重试机制就可以在上面这个场景里进行重试到b而不影响正确响应。

    操作

    需要进行如下的操作:

    ribbon:
      ReadTimeout: 10000
      ConnectTimeout: 10000
      MaxAutoRetries: 0
      MaxAutoRetriesNextServer: 1
      OkToRetryOnAllOperations: false
    

    引入spring-retry包

    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
     </dependency>
    

    以zuul为例子还需要配置开启重试:

    zuul.retryable=true
    

    遇到了问题

    然而万事总没那么一帆风顺,通过测试重试机制生效了,但是并没有我想象的去请求另一台健康的机子,于是被迫去吧开源码看一看,最终发现是源码的bug,不过已经修复,升级版本即可。

    代码分析

    使用的版本是
    spring-cloud-netflix-core:1.3.6.RELEASE
    spring-retry:1.2.1.RELEASE
    spring cloud 依赖版本:

    <dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.cloud</groupId>
    				<artifactId>spring-cloud-dependencies</artifactId>
    				<version>${spring-cloud.version}</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    

    因为启用了重试,所以请求应用时会执行RetryableRibbonLoadBalancingHttpClient.execute方法:

    public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
    		final RequestConfig.Builder builder = RequestConfig.custom();
    		IClientConfig config = configOverride != null ? configOverride : this.config;
    		builder.setConnectTimeout(config.get(
    				CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
    		builder.setSocketTimeout(config.get(
    				CommonClientConfigKey.ReadTimeout, this.readTimeout));
    		builder.setRedirectsEnabled(config.get(
    				CommonClientConfigKey.FollowRedirects, this.followRedirects));
    
    		final RequestConfig requestConfig = builder.build();
    		final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
    		RetryCallback retryCallback = new RetryCallback() {
    			@Override
    			public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
    				//on retries the policy will choose the server and set it in the context
    				//extract the server and update the request being made
    				RibbonApacheHttpRequest newRequest = request;
    				if(context instanceof LoadBalancedRetryContext) {
    					ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
    					if(service != null) {
    						//Reconstruct the request URI using the host and port set in the retry context
    						newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
    								newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
    								newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
    								newRequest.getURI().getFragment()));
    					}
    				}
    				newRequest = getSecureRequest(request, configOverride);
    				HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
    				final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
    				if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
    					if(CloseableHttpResponse.class.isInstance(httpResponse)) {
    						((CloseableHttpResponse)httpResponse).close();
    					}
    					throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
    							httpResponse.getStatusLine().getStatusCode());
    				}
    				return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
    			}
    		};
    		return this.executeWithRetry(request, retryPolicy, retryCallback);
    	}
    
    

    我们发现先new 一个RetryCallback,然后执行this.executeWithRetry(request, retryPolicy, retryCallback);
    而这个RetryCallback.doWithRetry的代码我们清楚看到是实际请求的代码,也就是说this.executeWithRetry方法最终还是会调用RetryCallback.doWithRetry

    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
    			RecoveryCallback<T> recoveryCallback, RetryState state)
    			throws E, ExhaustedRetryException {
    
    		RetryPolicy retryPolicy = this.retryPolicy;
    		BackOffPolicy backOffPolicy = this.backOffPolicy;
    
    		// Allow the retry policy to initialise itself...
    		RetryContext context = open(retryPolicy, state);
    		if (this.logger.isTraceEnabled()) {
    			this.logger.trace("RetryContext retrieved: " + context);
    		}
    
    		// Make sure the context is available globally for clients who need
    		// it...
    		RetrySynchronizationManager.register(context);
    
    		Throwable lastException = null;
    
    		boolean exhausted = false;
    		try {
    
    			// Give clients a chance to enhance the context...
    			boolean running = doOpenInterceptors(retryCallback, context);
    
    			if (!running) {
    				throw new TerminatedRetryException(
    						"Retry terminated abnormally by interceptor before first attempt");
    			}
    
    			// Get or Start the backoff context...
    			BackOffContext backOffContext = null;
    			Object resource = context.getAttribute("backOffContext");
    
    			if (resource instanceof BackOffContext) {
    				backOffContext = (BackOffContext) resource;
    			}
    
    			if (backOffContext == null) {
    				backOffContext = backOffPolicy.start(context);
    				if (backOffContext != null) {
    					context.setAttribute("backOffContext", backOffContext);
    				}
    			}
    
    			/*
    			 * We allow the whole loop to be skipped if the policy or context already
    			 * forbid the first try. This is used in the case of external retry to allow a
    			 * recovery in handleRetryExhausted without the callback processing (which
    			 * would throw an exception).
    			 */
    			while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
    
    				try {
    					if (this.logger.isDebugEnabled()) {
    						this.logger.debug("Retry: count=" + context.getRetryCount());
    					}
    					// Reset the last exception, so if we are successful
    					// the close interceptors will not think we failed...
    					lastException = null;
    					return retryCallback.doWithRetry(context);
    				}
    				catch (Throwable e) {
    
    					lastException = e;
    
    					try {
    						registerThrowable(retryPolicy, state, context, e);
    					}
    					catch (Exception ex) {
    						throw new TerminatedRetryException("Could not register throwable",
    								ex);
    					}
    					finally {
    						doOnErrorInterceptors(retryCallback, context, e);
    					}
    
    					if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
    						try {
    							backOffPolicy.backOff(backOffContext);
    						}
    						catch (BackOffInterruptedException ex) {
    							lastException = e;
    							// back off was prevented by another thread - fail the retry
    							if (this.logger.isDebugEnabled()) {
    								this.logger
    										.debug("Abort retry because interrupted: count="
    												+ context.getRetryCount());
    							}
    							throw ex;
    						}
    					}
    
    					if (this.logger.isDebugEnabled()) {
    						this.logger.debug(
    								"Checking for rethrow: count=" + context.getRetryCount());
    					}
    
    					if (shouldRethrow(retryPolicy, context, state)) {
    						if (this.logger.isDebugEnabled()) {
    							this.logger.debug("Rethrow in retry for policy: count="
    									+ context.getRetryCount());
    						}
    						throw RetryTemplate.<E>wrapIfNecessary(e);
    					}
    
    				}
    
    				/*
    				 * A stateful attempt that can retry may rethrow the exception before now,
    				 * but if we get this far in a stateful retry there's a reason for it,
    				 * like a circuit breaker or a rollback classifier.
    				 */
    				if (state != null && context.hasAttribute(GLOBAL_STATE)) {
    					break;
    				}
    			}
    
    			if (state == null && this.logger.isDebugEnabled()) {
    				this.logger.debug(
    						"Retry failed last attempt: count=" + context.getRetryCount());
    			}
    
    			exhausted = true;
    			return handleRetryExhausted(recoveryCallback, context, state);
    
    		}
    		catch (Throwable e) {
    			throw RetryTemplate.<E>wrapIfNecessary(e);
    		}
    		finally {
    			close(retryPolicy, context, state, lastException == null || exhausted);
    			doCloseInterceptors(retryCallback, context, lastException);
    			RetrySynchronizationManager.clear();
    		}
    
    	}
    

    在一个while循环里实现重试机制,当执行retryCallback.doWithRetry(context)出现异常的时候,就会catch异常,然后用 retryPolicy判断是否进行重试,特别注意registerThrowable(retryPolicy, state, context, e);方法,不但判断了是否重试,在重试情况下会新选出一个机子放入context,然后再去执行retryCallback.doWithRetry(context)时带入,如此就实现了换机子重试了。
    但是我的配置怎么会没有换机子呢?调试代码发现registerThrowable(retryPolicy, state, context, e);选出来的机子没问题,就是新的健康的机子,但是在执行retryCallback.doWithRetry(context)代码的时候依然请求的是那台挂掉的机子。
    所以我们再仔细看一下retryCallback.doWithRetry(context)的代码:
    我们发现了这行代码:

    newRequest = getSecureRequest(request, configOverride);
    protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) {
    		if (isSecure(configOverride)) {
    			final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
    					.scheme("https").build(true).toUri();
    			return request.withNewUri(secureUri);
    		}
    		return request;
    	}
    

    newRequest在前面已经使用context构建完毕,request是上一次请求的数据,只要执行这个代码就会发现newRequest永远都会被request覆盖。看到这里我们才发现原来是一个源码bug。
    issue地址:
    https://github.com/spring-cloud/spring-cloud-netflix/issues/2667

    总结

    这是一次很普通的查问题过程,在这个过程中当我发现配置没有达到我的预期时,我先查看了配置的含义,尝试多次无果,于是进行断点调试发现异常中断点后,因为场景需要一台机子健康一台机子下线,我模拟了数百次,最终才定位到了这行代码。开源项目即使是优秀的项目必然也会有bug存在,不迷信,不盲目。另一方面,阅读源码能力也是一个解决问题的重要能力,像我在找源码入口,定位代码时耗费了很多的时间。

  • 相关阅读:
    二十三、DBMS_METADATA(提供提取数据库对象的完整定义的接口)
    二十二、utl_inaddr(用于取得局域网或Internet环境中的主机名和IP地址)
    二十一、utl_file(用于读写OS文件)
    二十、dbms_stats(用于搜集,查看,修改数据库对象的优化统计信息)
    十九、dbms_resource_manager(用于维护资源计划,资源使用组和资源计划指令)
    十八、dbms_repair(用于检测,修复在表和索引上的损坏数据块)
    十七、dbms_tts(检查表空间集合是否是自包含)
    十六、dbms_space_admin(提供了局部管理表空间的功能)
    十五、dbms_space(分析段增长和空间的需求)
    vuex—actions
  • 原文地址:https://www.cnblogs.com/killbug/p/9150067.html
Copyright © 2011-2022 走看看