zoukankan      html  css  js  c++  java
  • dubbo为consumer创建代理

    ReferenceConfig.init()方法中获取到了最终的代理对象,先观察一下代理对象的视图。

    默认使用javassist生成动态类,可配置proxy为jdk,则使用jdk动态代理:

    <dubbo:reference id="hello" interface="com.zhang.HelloService" proxy="jdk" merger="true" check="false"/>

    jdk代理对象视图如下图:

    RegistryDirectory中有一个 urlInvokerMap。

    // Map<url, Invoker> cache service url to invoker mapping.
    private volatile Map<String, Invoker<T>> urlInvokerMap;

    示例:

    键: dubbo://192.168.233.6:20880/com.zhang.HelloService?anyhost=true&application=consumerk_app&check=false&dubbo=2.5.2&group=a&interface=com.zhang.HelloService&iothreads=2&merger=true&methods=sayHello,sayException,sayHi,sayFuck&pid=6736&sayException.retries=0&sayFuck.retries=0&sayFuck.return=true&sayHello.retries=0&sayHi.retries=0&side=consumer&timeout=6000&timestamp=1516256748120
    值: com.alibaba.dubbo.registry.integration.RegistryDirectory$InvokerDelegete@4147e2b6

    然后分析代理的创建过程,主要的调用栈如下:

    贴出Protocol$Adpative的代码:

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException(
                    "method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException(
                    "method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
                com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null)
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            //url不同,获得的扩展名是不一样的
            String extName = (url.getProtocol() == null ? "dubbo" : url
                    .getProtocol());
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
                                + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                    .getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(
                com.alibaba.dubbo.rpc.Invoker arg0)
                throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url
                    .getProtocol());
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
                                + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                    .getExtension(extName);
            return extension.export(arg0);
        }
    }

    从调用栈可以看到,Protocol$Adpative.refer(Class, URL)调用了2次。但是传递的URL参数是不一样的,

    第一次获取的protocol是 ProtocolFilterWrapper/ProtocolListenerWrapper/RegistryProtocol,
    第二次获取的protocol是 ProtocolFilterWrapper/ProtocolListenerWrapper/DubboProtocol。

    所以在ProtocolListenerWrapper和ProtocolFilterWrapper中走的分支不同。
    (第一次的URL是:
    registry://192.168.153.1:2181/com.alibaba.dubbo.registry.RegistryService?application=consumer&dubbo=2.1.2&localhost=true&pid=2504&refer=application%3Dconsumer%26dubbo%3D2.1.2%26interface%3Dcom.zhang.HelloService%26methods%3DsayHello%26pid%3D2504%26timestamp%3D1516461886484&registry=zookeeper&timestamp=1516461886531
    Protocol$Adpative.refer的refer方法中,getExtension()返回的扩展是ProtocolListenerWrapper/ProtocolFilterWrapper/RegistryProtocol。
    第二次的URL是:
    dubbo://192.168.153.1:20880/com.zhang.HelloService?anyhost=true&application=consumer&check=false&dubbo=2.1.2&interface=com.zhang.HelloService&methods=sayHello&pid=2504&timestamp=1516461886484)

    //ProtocolListenerWrapper类
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            //先进这个分支
            return protocol.refer(type, url);
        }
        //后续再进这个分支
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                        .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }
    
    //ProtocolFilterWrapper类
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            //先进这个分支
            return protocol.refer(type, url);
        }
        //后续再进这个分支
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

    ReferenceConfig<T>的createProxy方法负责创建代理,分析之后得到的步骤大致如下:

    1. 生成invoker

    // ReferenceConfig.createProxy方法。refprotocol是Protocol$Adpative对象
    invoker = refprotocol.refer(interfaceClass, url);

        1.1 生成filter链(使用内部类把filter和next封装起来。相当于创建一个类,有Filter属性,和Invoker属性)

    //ProtocolFilterWrapper类
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
    // ConsumerContextFilter, MonitorFilter, FutureFilter List
    <Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }

        1.2 生成RegistryDirectory$InvokerDelegete对象

    // RegistryDirectory.toInvokers
    // protocol.refer(serviceType, url)就是调用Protocol$Adpative的refer方法,即递归调用
    invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);

        1.3 生成MockClusterInvoker。涉及2个类:Protocol$Adpative和Cluster$Adpative,调用栈如下,

    package com.alibaba.dubbo.rpc.cluster;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
        public com.alibaba.dubbo.rpc.Invoker join(
                com.alibaba.dubbo.rpc.cluster.Directory arg0)
                throws com.alibaba.dubbo.rpc.cluster.Directory {
            if (arg0 == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.cluster.Directory argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = url.getParameter("cluster", "failover");
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url("
                                + url.toString() + ") use keys([cluster])");
            com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class)
                    .getExtension(extName);
            // extension是MockClusterWrapper
            return extension.join(arg0);
        }
    }


    2. 创建代理。

    private static final ProxyFactory proxyFactory = 
            ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    private T createProxy(Map<String, String> map) {
        // 省略其他代码
        return (T) proxyFactory.getProxy(invoker);
    }

    /META-INF/dubbo/internal/com.alibaba.dubbo.rpc.ProxyFactory文件内容:

    stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
    jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
    javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

    可以看到上面有一个wrapper,所以调用getExtension方法获得的扩展是StubProxyFactoryWrapper

    动态生成的ProxyFactory$Adpative类代码,格式化如下:

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
        public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0)
                throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException(
                        "com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = url.getParameter("proxy", "javassist");
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
                                + url.toString() + ") use keys([proxy])");
            com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
                    .getExtension(extName);
            // extension其实是StubProxyFactoryWrapper对象,对JavassistProxyFactory做一个wrap
            return extension.getProxy(arg0);
        }
    
        public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
                java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2)
                throws java.lang.Object {
            if (arg2 == null)
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg2;
            String extName = url.getParameter("proxy", "javassist");
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
                                + url.toString() + ") use keys([proxy])");
            com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
                    .getExtension(extName);
            return extension.getInvoker(arg0, arg1, arg2);
        }
    }

     所以getExtension的返回对象是:StubProxyFactoryWrapper/JavassistProxyFactory

    // JavassistProxyFactory
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    //一、 Proxy p = Proxy.getProxy(interfaces); //二、 p.newInstance(new InvokerInvocationHandler(invoker)); return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }

    Proxy0和proxy0是javassist动态生成的类:其中p是Proxy0实例,调用newInstance(InvocationHandler)方法创建proxy0实例。

    下面是推测的代码(大概):

    class com.alibaba.dubbo.common.bytecode.Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy {
        public Object newInstance(java.lang.reflect.InvocationHandler h){
            return new com.alibaba.dubbo.common.bytecode.proxy0($1);
        }
    }
    
    class com.alibaba.dubbo.common.bytecode.proxy0 implements
        com.alibaba.dubbo.rpc.service.EchoService, com.zhang.HelloService {
        public <init>(java.lang.reflect.InvocationHandler arg0){
            handler=$1;
        }
        public static java.lang.reflect.Method[] methods;
        private java.lang.reflect.InvocationHandler handler;
        
        public java.lang.String sayHello(){
            Object[] args = new Object[0];
            Object ret = handler.invoke(this, methods[0], args);
            return (java.lang.String)ret;
        }
        public java.lang.Object $echo(java.lang.Object arg0){
            Object[] args = new Object[1];
            args[0] = ($w)$1;
            Object ret = handler.invoke(this, methods[1], args);
            return (java.lang.Object)ret;
        }
    }
  • 相关阅读:
    【leetcode】423. Reconstruct Original Digits from English
    【leetcode】435. Non-overlapping Intervals
    【leetcode】424. Longest Repeating Character Replacement
    【leetcode】519. Random Flip Matrix
    【leetcode】995. Minimum Number of K Consecutive Bit Flips
    【leetcode】1001. Grid Illumination
    【leetcode】1002. Find Common Characters
    【leetcode】1003. Check If Word Is Valid After Substitutions
    L95
    T61
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8303495.html
Copyright © 2011-2022 走看看