pigeon客户端调用的责任链代码在 InvokerProcessHandlerFactory # init()
public static void init() { if (!isInitialized) { if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); } registerBizProcessFilter(new TraceFilter()); registerBizProcessFilter(new FaultInjectionFilter()); registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter()); bizInvocationHandler = createInvocationHandler(bizProcessFilters); isInitialized = true; } }
责任链在这里的作用就是对于 InvokerContext 的不断补充。比如说 ClusterInvokeFilter
public class ClusterInvokeFilter extends InvocationInvokeFilter { private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class); public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster()); if (cluster == null) { throw new IllegalArgumentException("Unsupported cluster type:" + cluster); } return cluster.invoke(handler, invocationContext); } }
public class ClusterFactory { private final static ConcurrentHashMap<String, Cluster> clusters = new ConcurrentHashMap<String, Cluster>(); static { init(); } public static void init() { clusters.put(Constants.CLUSTER_FAILFAST, new FailfastCluster()); clusters.put(Constants.CLUSTER_FAILOVER, new FailoverCluster()); clusters.put(Constants.CLUSTER_FAILSAFE, new FailsafeCluster()); clusters.put(Constants.CLUSTER_FORKING, new ForkingCluster()); } public static void registerCluster(String clusterType, Cluster cluster) { clusters.put(clusterType, cluster); } public static Cluster selectCluster(String clusterType) { Cluster cluster = clusters.get(clusterType); if (cluster == null) { return clusters.get(Constants.CLUSTER_FAILFAST); } return cluster; } }
默认的是 Constants.CLUSTER_FAILFAST
public class FailfastCluster implements Cluster { private ClientManager clientManager = ClientManager.getInstance(); private static final Logger logger = LoggerLoader.getLogger(FailfastCluster.class); @Override public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig); boolean timeoutRetry = invokerConfig.isTimeoutRetry(); if (!timeoutRetry) { Client remoteClient = clientManager.getClient(invokerConfig, request, null); invocationContext.setClient(remoteClient);//让后续的发送tcp的filter能够拿到的netty的客户端 try { return handler.handle(invocationContext); } catch (NetworkException e) { remoteClient = clientManager.getClient(invokerConfig, request, null); invocationContext.setClient(remoteClient); return handler.handle(invocationContext); } } else {
......