一、引言
在我们之前feign源码解析一文中,已经提到了hystrix,所以这次的分析也是接着上次feign的源码,继续展开讲解。
二、FeignClientFactoryBean创建JDK动态代理
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) { //没有开启hystrix feign.hystrix.enabled=false if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) { return feign.target(target); } //如果开启了hystrix feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign; //获取实例名称 String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName() : factory.getContextId(); SetterFactory setterFactory = getOptional(name, context, SetterFactory.class); if (setterFactory != null) { builder.setterFactory(setterFactory); } //注解中的fallback Class<?> fallback = factory.getFallback(); if (fallback != void.class) { return targetWithFallback(name, context, target, builder, fallback); } //注解中的FallbackFactory Class<?> fallbackFactory = factory.getFallbackFactory(); if (fallbackFactory != void.class) { return targetWithFallbackFactory(name, context, target, builder, fallbackFactory); } //开启了hystrix,但是没有写fallback或fallbackFactory方法 return feign.target(target); }
targetWithFallback降级逻辑由@FeignClient注解的fallback指定的类提供。
private <T> T targetWithFallback(String feignClientName, FeignContext context, Target.HardCodedTarget<T> target, HystrixFeign.Builder builder, Class<?> fallback) { // 通过feignClientName和fallback 从子容器获取对应的Bean T fallbackInstance = getFromContext("fallback", feignClientName, context, fallback, target.type()); // 调用HystrixFeign.Builder的target方法 return builder.target(target, fallbackInstance); }
public <T> T target(Target<T> target, T fallback) {
//通过fallback构建了默认的FallbackFactory 然后把fallback实例赋值给了内部属性constant return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
//调用ReflectiveFeign创建 .newInstance(target); }
targetWithFallbackFactory降级逻辑由@FeignClient注解的fallbackFactory指定的类提供。
private <T> T targetWithFallbackFactory(String feignClientName, FeignContext context, Target.HardCodedTarget<T> target, HystrixFeign.Builder builder, Class<?> fallbackFactoryClass) { // 通过feignClientName和fallbackFactoryClass 获取子容器中的FallbackFactory实现类 FallbackFactory<? extends T> fallbackFactory = (FallbackFactory<? extends T>) getFromContext( "fallbackFactory", feignClientName, context, fallbackFactoryClass, FallbackFactory.class); // 调用HystrixFeign.Builder的target方法 return builder.target(target, fallbackFactory); }
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) { //直接通过取到的fallbackFactory实例构建 return build(fallbackFactory).newInstance(target); }
两种fallback最终都走到build(fallbackFactory).newInstance(target)方法
Feign build(final FallbackFactory<?> nullableFallbackFactory) { // 设置创建JDKInvocationHandler的工厂 super.invocationHandlerFactory(new InvocationHandlerFactory() { @Override public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) { // 创建HystrixInvocationHandler return new HystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory); } }); // 代理SpringMvcContract,对方法元数据解析做一些Hystrix相关的定制改造 super.contract(new HystrixDelegatingContract(contract)); // 构造ReflectiveFeign return super.build(); } @Override public <T> T newInstance(Target<T> target) { // 获取方法名对应的处理类的映射关系 Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); ``` // 创建HystrixInvocationHandler InvocationHandler handler = factory.create(target, methodToHandler); // 创建JDK动态代理 T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler); return proxy; }
这段创建逻辑在之前的feign源码篇已经分析过了,但是没有分析控制器HystrixInvocationHandler,现在分析一下
final class HystrixInvocationHandler implements InvocationHandler { // HardCodedTarget实例 // 封装原始FeignClient接口Class、服务名(trade-service)、url(http://trade-service) private final Target<?> target; // 目标方法 - 实际处理的Handler private final Map<Method, MethodHandler> dispatch; // 降级工厂 private final FallbackFactory<?> fallbackFactory; // 目标方法 - 目标方法 setAccessible(true) private final Map<Method, Method> fallbackMethodMap; // 目标方法 - com.netflix.hystrix.HystrixCommand.Setter private final Map<Method, Setter> setterMethodMap; // 构造方法 HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch, SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) { this.target = checkNotNull(target, "target"); this.dispatch = checkNotNull(dispatch, "dispatch"); this.fallbackFactory = fallbackFactory; // 构造目标方法 - 目标方法映射,主要是为了防止invoke方法多次调用setAccessible(true) this.fallbackMethodMap = toFallbackMethod(dispatch); // 构造目标方法 - com.netflix.hystrix.HystrixCommand.Setter this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet()); } // static Map<Method, Method> toFallbackMethod(Map<Method, MethodHandler> dispatch) { Map<Method, Method> result = new LinkedHashMap<Method, Method>(); for (Method method : dispatch.keySet()) { method.setAccessible(true); result.put(method, method); } return result; } // static Map<Method, Setter> toSetters(SetterFactory setterFactory, Target<?> target, Set<Method> methods) { Map<Method, Setter> result = new LinkedHashMap<Method, Setter>(); for (Method method : methods) { method.setAccessible(true); // 调用SetterFactory的create方法创建Hystrix的Setter result.put(method, setterFactory.create(target, method)); } return result; } ``` }
SetterFactory的默认实现feign.hystrix.SetterFactory.Default。HystrixCommandGroupKey取HardCodedTarget实例的name属性,也就是服务名。HystrixCommandKey取的是StockClient.getStock(Long)类名和方法签名的拼接。所以默认FeignHystrix资源隔离的线程池维度是HystrixCommandGroupKey,一个服务名一个线程池。
final class Default implements SetterFactory { @Override public HystrixCommand.Setter create(Target<?> target, Method method) { String groupKey = target.name(); String commandKey = Feign.configKey(target.type(), method); return HystrixCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey)) .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey)); } }
HystrixInvocationHandler#invoke方法
final class HystrixInvocationHandler implements InvocationHandler { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { // 如果是Object提供的方法,和ReflectiveFeign.FeignInvocationHandler的处理方式一致 if ("equals".equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null; return equals(otherHandler); } catch (IllegalArgumentException e) { return false; } } else if ("hashCode".equals(method.getName())) { return hashCode(); } else if ("toString".equals(method.getName())) { return toString(); } // 创建HystrixCommand HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { // 获取MethodHandler执行方法 和ReflectiveFeign.FeignInvocationHandler的处理方式一致 return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } @Override protected Object getFallback() { // 如果fallbackFactory为空 // 抛出throw new UnsupportedOperationException("No fallback available.") if (fallbackFactory == null) { return super.getFallback(); } try { // getExecutionException获取HystrixCommand执行过程中的异常 // 调用自己定义的fallbackFactory的create方法创建fallback Object fallback = fallbackFactory.create(getExecutionException()); // 反射调用fallback的对应方法 Object result = fallbackMethodMap.get(method).invoke(fallback, args); // 解析方法的返回类型 if (isReturnsHystrixCommand(method)) { // 如果是HystrixCommand,那么直接执行execute方法同步返回 return ((HystrixCommand) result).execute(); } else if (isReturnsObservable(method)) { // Create a cold Observable return ((Observable) result).toBlocking().first(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return ((Single) result).toObservable().toBlocking().first(); } else if (isReturnsCompletable(method)) { ((Completable) result).await(); return null; } else if (isReturnsCompletableFuture(method)) { return ((Future) result).get(); } else { return result; } } catch (IllegalAccessException e) { // shouldn't happen as method is public due to being an interface throw new AssertionError(e); } catch (InvocationTargetException | ExecutionException e) { // Exceptions on fallback are tossed by Hystrix throw new AssertionError(e.getCause()); } catch (InterruptedException e) { // Exceptions on fallback are tossed by Hystrix Thread.currentThread().interrupt(); throw new AssertionError(e.getCause()); } } }; // 解析方法返回类型,确定HystrixCommand如何返回 if (Util.isDefault(method)) { // 如果是默认方法直接同步执行 return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { // 如果是HystrixCommand,直接返回HystrixCommand实例,不执行 return hystrixCommand; } else if (isReturnsObservable(method)) { // Create a cold Observable return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } else if (isReturnsCompletableFuture(method)) { return new ObservableCompletableFuture<>(hystrixCommand); } // 正常返回类型,同步执行HystrixCommand的execute方法 return hystrixCommand.execute(); } private boolean isReturnsCompletable(Method method) { return Completable.class.isAssignableFrom(method.getReturnType()); } private boolean isReturnsHystrixCommand(Method method) { return HystrixCommand.class.isAssignableFrom(method.getReturnType()); } private boolean isReturnsObservable(Method method) { return Observable.class.isAssignableFrom(method.getReturnType()); } private boolean isReturnsCompletableFuture(Method method) { return CompletableFuture.class.isAssignableFrom(method.getReturnType()); } private boolean isReturnsSingle(Method method) { return Single.class.isAssignableFrom(method.getReturnType()); } ``` }
这里为什么要对不同fallback返回值类型做不同的处理呢?
因为fallback尽量避免阻塞操作,比如网络通信。如果因为业务需要,非得在fallback里做远程调用。那么可以返回HystrixCommand,这样fallback就可以换一个HystrixCommandGroupKey在另外的线程池,并且具有隔离的资源和独立的熔断指标统计。
举个栗子
@Service public class OuterApiFallback implements OuterApi { public Response<String> getDefaultFallBack() { return Response.fail("out服务熔断异常"); } @Override public Response<String> testConcurrent() { return getDefaultFallBack(); } @Override public HystrixCommand<Response<String>> testNextConcurrent() { HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("fallback")); return new HystrixCommand<Response<String>>(setter) { @Override protected Response<String> run() throws Exception { // 可能是又一次网络通信,比如请求某个降级服务 ··· //然后返回 return Response.success(""); } }; } }
spring-cloud-openfeign-core与Hystrix集成是在InvocationHandler里创建了HystrixCommand,支持降级,但是不支持缓存(创建的HystrixCommand没有重写getCacheKey方法)。
参考链接:https://juejin.cn/post/6881546816999915534