zoukankan      html  css  js  c++  java
  • Pigeon的客户端调用责任链

    pigeon客户端调用的责任链代码在  InvokerProcessHandlerFactory # init()

    public static void init() {
            if (!isInitialized) {
                if (Constants.MONITOR_ENABLE) {
                    registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
                }
                registerBizProcessFilter(new TraceFilter());
                registerBizProcessFilter(new FaultInjectionFilter());
                registerBizProcessFilter(new DegradationFilter());
                registerBizProcessFilter(new ClusterInvokeFilter());
                registerBizProcessFilter(new GatewayInvokeFilter());
                registerBizProcessFilter(new ContextPrepareInvokeFilter());
                registerBizProcessFilter(new SecurityFilter());
                registerBizProcessFilter(new RemoteCallInvokeFilter());
                bizInvocationHandler = createInvocationHandler(bizProcessFilters);
                isInitialized = true;
            }
        }

    责任链在这里的作用就是对于 InvokerContext 的不断补充。比如说 ClusterInvokeFilter

    public class ClusterInvokeFilter extends InvocationInvokeFilter {
    
        private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class);
    
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
            Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
            if (cluster == null) {
                throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
            }
            return cluster.invoke(handler, invocationContext);
        }
    
    }
    public class ClusterFactory {
    
        private final static ConcurrentHashMap<String, Cluster> clusters = new ConcurrentHashMap<String, Cluster>();
    
        static {
            init();
        }
    
        public static void init() {
            clusters.put(Constants.CLUSTER_FAILFAST, new FailfastCluster());
            clusters.put(Constants.CLUSTER_FAILOVER, new FailoverCluster());
            clusters.put(Constants.CLUSTER_FAILSAFE, new FailsafeCluster());
            clusters.put(Constants.CLUSTER_FORKING, new ForkingCluster());
        }
    
        public static void registerCluster(String clusterType, Cluster cluster) {
            clusters.put(clusterType, cluster);
        }
    
        public static Cluster selectCluster(String clusterType) {
            Cluster cluster = clusters.get(clusterType);
            if (cluster == null) {
                return clusters.get(Constants.CLUSTER_FAILFAST);
            }
            return cluster;
        }
    }

    默认的是  Constants.CLUSTER_FAILFAST 

    public class FailfastCluster implements Cluster {
    
        private ClientManager clientManager = ClientManager.getInstance();
    
        private static final Logger logger = LoggerLoader.getLogger(FailfastCluster.class);
    
        @Override
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
            InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
    
            boolean timeoutRetry = invokerConfig.isTimeoutRetry();
            if (!timeoutRetry) {
                Client remoteClient = clientManager.getClient(invokerConfig, request, null);
                invocationContext.setClient(remoteClient);//让后续的发送tcp的filter能够拿到的netty的客户端
                try {
                    return handler.handle(invocationContext);
                } catch (NetworkException e) {
                    remoteClient = clientManager.getClient(invokerConfig, request, null);
                    invocationContext.setClient(remoteClient);
                    return handler.handle(invocationContext);
                }
            } else {
         ......

      

  • 相关阅读:
    Centos 7.6搭建Skywalking6.5+es6.2.4
    Skywalking入门介绍,skywalking6.5.0 +mysql (windows) 搭建
    使用springcloud gateway搭建网关(分流,限流,熔断)
    Elastalert
    Docker 部署ELK之Sentinl日志报警
    Docker 部署ELK
    基于sentry的前端错误监控日志系统(部署sentry服务器/前端项目部署)-让前端最快的定位到生产问题
    sentry之二:sentry配置钉钉和email
    sentry之一:sentry安装
    全链路追踪技术选型:pinpoint vs skywalking
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14805871.html
Copyright © 2011-2022 走看看