zoukankan      html  css  js  c++  java
  • Pigeon源码分析(二) -- 客户端调用源码分析

    先看客户端调用的例子

    public static void main(String[] args) throws Exception {
        InvokerConfig<EchoService> invokerConfig = new InvokerConfig<>(EchoService.class);
        EchoService echoService = ServiceFactory.getService( invokerConfig);
        System.out.println("echoService result:" + echoService.echo("echoService_input"));
    }

      要分析的入口就是 ServiceFactory.getService

    一  ServiceFactory.getService

    public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
            return serviceProxy.getProxy(invokerConfig);
        }

    static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();

    二  AbstractServiceProxy# getProxy

    public <T> T getProxy(InvokerConfig<T> invokerConfig) {
            if (invokerConfig.getServiceInterface() == null) {
                throw new IllegalArgumentException("service interface is required");
            }
            if (StringUtils.isBlank(invokerConfig.getUrl())) {
                invokerConfig.setUrl(ServiceFactory.getServiceUrl(invokerConfig));
            }
            if (!StringUtils.isBlank(invokerConfig.getProtocol())
                    && !invokerConfig.getProtocol().equalsIgnoreCase(Constants.PROTOCOL_DEFAULT)) {
                String protocolPrefix = "@" + invokerConfig.getProtocol().toUpperCase() + "@";
                if (!invokerConfig.getUrl().startsWith(protocolPrefix)) {
                    invokerConfig.setUrl(protocolPrefix + invokerConfig.getUrl());
                }
            }
            Object service = null;
            service = services.get(invokerConfig);
            if (service == null) {
                try {
                    InvokerBootStrap.startup();//初始化各种之后用得到的,比如调用链,负载均衡客户端,序列化器等
                    service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);//使用jdk的动态代理
                    if (StringUtils.isNotBlank(invokerConfig.getLoadbalance())) {
                        LoadBalanceManager.register(invokerConfig.getUrl(), invokerConfig.getGroup(),
                                invokerConfig.getLoadbalance());//注册负载均衡器
                    }
                } catch (Throwable t) {
                    throw new RpcException("error while trying to get service:" + invokerConfig, t);
                }
    
                // setup region policy for service
                try {
                    regionPolicyManager.register(invokerConfig.getUrl(), invokerConfig.getGroup(),
                            invokerConfig.getRegionPolicy());
                } catch (Throwable t) {
                    throw new RegionException("error while setup region route policy: " + invokerConfig, t);
                }
    
                try {
                    ClientManager.getInstance().registerClients(invokerConfig.getUrl(), invokerConfig.getGroup(),
                            invokerConfig.getVip());//建立netty连接,并缓存起来

      这里重点看下 前两个方法就行

      InvokerBootStrap.startup();

      service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);

    三  InvokerBootStrap.startup()

    public static void startup() {
            if (!isStartup) {
                synchronized (InvokerBootStrap.class) {
                    if (!isStartup) {
                        RegistryConfigLoader.init();//配置中心初始化,不必关心
                        ServiceInvocationRepository.getInstance().init();//启动一个线程 检查所有当前还未结束的请求,每个一秒检查一次 是否执行时间大于请求里配置的超时时间
                        InvokerProcessHandlerFactory.init();//初始化各个Filter,这些filter会组成调用链
                        SerializerFactory.init();//初始化全部序列化器
                        LoadBalanceManager.init();//初始化四种负载均衡器
                        RegionPolicyManager.INSTANCE.init();
                        Monitor monitor = MonitorLoader.getMonitor();
                        if (monitor != null) {
                            monitor.init();
                        }
                        isStartup = true;
                        logger.warn("pigeon client[version:" + VersionUtils.VERSION + "] has been started");
                    }
                }
            }
        }

    这里重点看下 初始化拦截器链的逻辑

    InvokerProcessHandlerFactory#init()

    public static void init() {
            if (!isInitialized) {
                registerBizProcessFilter(new InvokerDataFilter());
                if (Constants.MONITOR_ENABLE) {
                    registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
                }
                registerBizProcessFilter(new DegradationFilter());
                registerBizProcessFilter(new FlowControlPigeonClientFilter());
                registerBizProcessFilter(new ClusterInvokeFilter());
                registerBizProcessFilter(new GatewayInvokeFilter());
                registerBizProcessFilter(new ContextPrepareInvokeFilter());
    
                registerBizProcessFilter(new RemoteCallInvokeFilter());
                bizInvocationHandler = createInvocationHandler(bizProcessFilters);
                isInitialized = true;
            }
        }
    private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler(
                List<V> internalFilters) {
            ServiceInvocationHandler last = null;
            List<V> filterList = new ArrayList<V>();
            filterList.addAll(internalFilters);
            for (int i = filterList.size() - 1; i >= 0; i--) {
                final V filter = filterList.get(i);
                final ServiceInvocationHandler next = last;
                last = new ServiceInvocationHandler() {
                    @SuppressWarnings("unchecked")
                    @Override
                    public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
                        InvocationResponse resp = filter.invoke(next, invocationContext);
                        return resp;
                    }
                };
            }
            return last;
        }

      上面的写法是责任链最好的写法了,责任链最强的地方就在于,可以在里面的next,也就是下一个handler之前做一些事情,之后做一些事情。比如统计请求失败率的filter就是在next执行之后,看执行的结果。

    四  DefaultAbstractSerializer# proxyRequest

    @Override
        public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
            return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
                    new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
                            InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
        }

      public class ServiceInvocationProxy implements InvocationHandler

    public class ServiceInvocationProxy implements InvocationHandler {
    
        private static final Logger logger = LoggerLoader.getLogger(ServiceInvocationProxy.class);
        private InvokerConfig<?> invokerConfig;
        private ServiceInvocationHandler handler;
    
        public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) {
            this.invokerConfig = invokerConfig;
            this.handler = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(handler, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return handler.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return handler.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return handler.equals(args[0]);
            }
            return extractResult(
                    handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
                    method.getReturnType());
        }

    其实就是调用上一步的各种责任链,这里就包括了负载均衡,发送tcp请求等底层逻辑

    五  ClientManager.getInstance().registerClients

      实际调用的是 ClientManager 中的内部类

    class InnerServiceProviderChangeListener implements ServiceProviderChangeListener {
            @Override
            public void providerAdded(ServiceProviderChangeEvent event) {
                if (logger.isInfoEnabled()) {
                    logger.info("add " + event.getHost() + ":" + event.getPort() + ":" + event.getWeight() + " to "
                            + event.getServiceName());
                }
                registerClient(event.getServiceName(), event.getHost(), event.getPort(), event.getWeight(),event.getGroup());
            }
    public void registerClient(String serviceName, String host, int port, int weight,String group) {
            ConnectInfo connectInfo = new ConnectInfo(serviceName, host, port, weight,group);
            this.clusterListenerManager.addConnect(connectInfo);
            RegistryManager.getInstance().addServiceAddress(serviceName, host, port, weight,group);//这部分都是把服务端缓存起来的逻辑
        }

      创建netty连接的逻辑在  this.clusterListenerManager.addConnect(connectInfo);

      实际调用的是 DefaultClusterListener.addConnect

    public void addConnect(ConnectInfo connectInfo) {
            if (logger.isInfoEnabled()) {
                logger.info("[cluster-listener] add service provider:" + connectInfo);
            }
            Client client = this.allClients.get(connectInfo.getConnect());
            if (clientExisted(connectInfo)) {
                if (client != null) {
                    for (Map<String, List<Client>> clientMap : serviceClients.values()) {
                        for (List<Client> clientList : clientMap.values()) {
                            int idx = clientList.indexOf(client);
                            if (idx >= 0 && clientList.get(idx) != client) {
                                closeClientInFuture(client);
                            }
                        }
                    }
                } else {
                    return;
                }
            }
            if (client == null) {
                client = ClientSelector.selectClient(connectInfo);//创建tcp的客户端 netty实现的
            }
            if (!this.allClients.containsKey(connectInfo.getConnect())) {
                Client oldClient = this.allClients.putIfAbsent(connectInfo.getConnect(), client);//进行缓存
                if (oldClient != null) {
                    client = oldClient;
                }
            }

      代码很多,重点说下

      1 建立netty客户端连接,是和每一个服务端都建立一个netty连接

      2 监听zk节点,如果有服务端下线,去掉对应的连接

  • 相关阅读:
    第 9 章 完成购物车
    新建 ASP.NET MVC 项目快速代码
    一个真正的应用程序(第7~8章)(所需代码在下一篇随笔里)
    HTML
    squid 高匿设置
    Linux操作系统上ADSL拨号上网的方法详解
    MYSQL-max_binlog_cache_size参数
    mysql查杀会话
    centos配置Tomcat以指定的身份(非root)运行
    mysql load data导入脚本
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14837916.html
Copyright © 2011-2022 走看看