zoukankan      html  css  js  c++  java
  • Dubbo Invoker (六)

    在上一节中,由于篇幅原因,对invoker的介绍会在本章节详细阐述。

    生成消费者调用invoker

    将代码点到ReferenceConfig#createProxy ->  invoker = refprotocol.refer(interfaceClass, url);

    这边还不得不提一下refprotocol到底取到的是哪个扩展点呢?

    根据ExtensionLoader和wrapper的关系,加载到的结果是这样的:

     实际上调用了 ProtocolListenerWrapper.refer -> ProtocolFilterWrapper.refer  -> registerProtocal#refer   -> RegistryProtocol#doRefer

      ProtocolListenerWrapper.refer  执行一些监听器的referred动作 ,传入的参数是 registerProtocal#refer 获得的invoker。返回的是ListenerInvokerWrapper其实现了Invoker<T>接口。

    ProtocolFilterWrapper.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }

    buildInvokerChain这个方法的作用就是 处理一系列的过滤器,每一个过滤器都有一个invoker处理,而ProtocolListenerWrapper.refer 得到的invoker,作为最后一个要处理的invoker。关于过滤器将在后续介绍。

    RegistryProtocol#doRefer

    /**
         *
         * @param cluster      FailoverCluster
         * @param registry     zookeeperRegister
         * @param type         DemoService.class
         * @param url          zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=35673&timestamp=1640055558556
         * @param <T>
         * @return
         */
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
                directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
                // 去注册消费者
                registry.register(directory.getRegisteredConsumerUrl());
            }
            directory.buildRouterChain(subscribeUrl);
            /**
             * 为消息消费者添加category=providers,configurators,routers属性后,然后向注册中心订阅该URL,
             * 关注该服务下的providers,configurators,routers发生变化时通知RegistryDirectory,
             * 以便及时发现服务提供者、配置、路由规则的变化
             */
            directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                    PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    
            // 根据Directory,利用集群策略返回集群Invoker。cluster$Adaptive ->  FailoverClusterInvoker
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }

     服务的注册和发现跟RegistryDirectory相关联,包括获取对应的invoker。下图是RegisterDirectory的结构,当然还有StaticDirectory,在上面的代码中设置了Register的具体扩展,protocal,RegisteredConsumerUrl,RouterChain

     

    然后调用Invoker invoker = cluster.join(directory);来创建invoker。

    public class FailoverCluster implements Cluster {
    
        public final static String NAME = "failover";
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    
    }

    从代码上可以看到  实际上返回的是FailoverClusterInvoker。在这里  不得不简单描述一下Cluster集群容错与invoker的关系

     最后我们拿到的实际上是FailoverClusterInvoker,当执行方法调用的时候,实际上就是执行其doInvoke方法。

    这边先说个结论;: 此时的invoker为  MockClusterInvoker 

    具体细节将在后面章节阐述

     createProxy

    现在回到上一节的代理部分

    ReferenceConfig#createProxy

    /**
             * 根据invoker获取代理类,其实现逻辑如下:
             * 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口。
             * 增加默认接口EchoService接口。
             * 根据需要实现的接口,使用jdk或Javassist创建代理类。
             */
            return (T) proxyFactory.getProxy(invoker);

    proxyFactory.getProxy
    -> AbstractProxyFactory#getProxy(Invoker<T> invoker)
    -> AbstractProxyFactory#getProxy(Invoker<T> invoker)#getProxy(Invoker<T> invoker, boolean generic)
    -> JavassistProxyFactory#getProxy

    1) Proxy.getProxy(interfaces) 返回的是一个 Proxy0 对象,注意是大写的P
    2)最后调用 Proxy0 的 newInstance 方法,实例化一个包装类型的 proxy0,注意是小写的
    3)dubbo 里面的 EchoService 或者 泛化调用,都是通过javassist 字节码工具,以实现方法形式实现,所以才可以在业务层面强转成功。
    4)对于返回的代理类,也实现对应 refer 的接口,例如上述实现的 DemoService。
    5)DC 是dynamic code 缩写,只是一个标识,说明是动态生成的类。
    6)对于手动执行方法,都是调用 InvokerInvocationHandler 的 invoke 方法

        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }

     这边的proxy是使用的org.apache.dubbo.common.bytecode.Proxy;的proxy工具类,InvokerInvocationHandler的invoke方法返回的数据类型就是远程服务的类型。

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
    
            return invoker.invoke(createInvocation(method, args)).recreate();
        }

    也就是说 当执行了远程服务接口方法的时候,实际上就是执行是proxy0.sayhello,然后AbstractClusterInvoker#invoke ,上面的代码做了如下事情:

    1)   该方法对应的类如果是Object 类型,则直接执行该方法。
    2 )  判断是否为toString、hashCode、equals,是则直接执行invoker 的对应方法
    3)  构造一个 RpcInvocation,用于执行invoker 的 invoker 方法,而RpcInvocation 的生层过程就是填充一些比如methodName,parameterTypes, arguments, attachments, invoker 等信息。此invoker 为 MockClusterInvoker。

    接下来就是服务调用过程了,invoker.invoke将在后续讲解。其中也涉及了负载均衡  集群容错和服务的注册和发现,其中包含了动态配置路由配置

  • 相关阅读:
    玩转渗透神器Kali:Kali Linux作为主系统使用的正确姿势TIPS
    知道创宇研发技能表v2.2
    我对什么都感兴趣,可我迷茫了(转载)
    防御性编程
    防御性编程技巧
    移动安全技术如何未雨绸缪?
    1054. 求平均值 (20)
    1053. 住房空置率 (20)
    1052. 卖个萌 (20)
    1051. 复数乘法 (15)
  • 原文地址:https://www.cnblogs.com/gaojy/p/15709143.html
Copyright © 2011-2022 走看看