zoukankan      html  css  js  c++  java
  • dubbo源码阅读-服务调用(十二)之本地调用(Injvm)

    使用例子

     <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" scope="local" /> 

    本地如何引用

    ReferenceConfig#createProxy 

    参见《dubbo源码阅读-服务订阅(八)之主流程》

     private T createProxy(Map<String, String> map) {
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            //是否是本地调用
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                    //根据url配置判断是否是本地调用
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            //如果是本地引用
            if (isJvmRefer) {
                //组织url为injvm://127.0.0.1
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                /**
                 * <1>SPI扩展点
                 * private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                 */
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                //如果我们手动配置了url;隔开 追加到urls列表
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    // 拆分地址成数组,使用 ";" 分隔。
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    // 循环数组,添加到 `url` 中。
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            // 创建 URL 对象
                            URL url = URL.valueOf(u);
                            // 设置默认路径
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            // 注册中心的地址,带上服务引用的配置参数
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                // 服务提供者的地址
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    //获取注册中心地址 具体可以看你https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-10-0
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            //加载Monitor 使用例子https://blog.csdn.net/sunhuaqiang1/article/details/80141651
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            //添加refer标识 标识是从注册中心地尼公约
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
                //  // 单 `urls` 时,引用服务,返回 Invoker 对象
                if (urls.size() == 1) {
                    /**
                     * Protocol 取得registryProtocol 不过会被代理 具体可以看 https://www.cnblogs.com/LQBlog/p/12470179.html#autoid-2-0-0
                     * 默认是registry SPI扩展 private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                     */
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    //集群订阅
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    // 循环 `urls` ,引用服务,返回 Invoker 对象
                    for (URL url : urls) {
                        // 引用服务
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        // 使用最后一个注册中心的 URL
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
    
                    //  // 有注册中心
                    if (registryURL != null) { // registry url is available
                        // 对有注册中心的 Cluster 只用 AvailableCluster
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                        // 无注册中心,全部都是服务直连
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            //是否配置了启动检查
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                // make it possible for consumer to retry later if provider is temporarily unavailable
                initialized = false;
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            //<2>创建代理类
            return (T) proxyFactory.getProxy(invoker);
        }

    <1>InjvmProtocol#refer

     public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            //<3>创建一个 InjvmInvoker 引用对象
            return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }

    参见《dubbo源码阅读-服务订阅(八)之本地订阅(injvm)》

    <2>InvokerInvocationHandler

    参见《dubbo源码阅读-ProxyFactory(十一)之JdkProxyFactory》

    /**
     * InvokerHandler
     */
    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            //如果当前method是Object直接调用
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            //toString hashCode equals 直接调用object的 // 基础方法,不使用 RPC 调用
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            //<3>走RPC
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }

    InjvmInvoker

    <3>doInvoke

     @Override
        public Result doInvoke(Invocation invocation) throws Throwable {
            //可以发现是从exporterMap获取对应的Exporter然后再获取内部Invoker执行调用
            Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
            if (exporter == null) {
                throw new RpcException("Service [" + key + "] not found.");
            }
            RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
    //<4>exporter如何产生
    return exporter.getInvoker().invoke(invocation); }

     注意看2 此时的InvoCation是RpcInvocation 封装了method和方法信息

    Exporter如何产生

    参见《dubbo源码阅读-服务暴露(七)之主流程》

    <4>ServiceConfig#doExportUrlsFor1Protocol

     private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            //获取协议名
            String name = protocolConfig.getName();
            //如果没配置 则默认dubbo
            if (name == null || name.length() == 0) {
                name = "dubbo";
            }
    
            Map<String, String> map = new HashMap<String, String>();
            //封装 sid=provider 到map
            map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
            //设置dubbo版本
            map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
            //设置时间
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            //反射获取application所有公共的get方法 以及parameter配置 封装到map
            appendParameters(map, application);
            //反射获取moule所有公共的get方法 以及parameter配置 封装到map
            appendParameters(map, module);
            //反射获取provider所有公共的get方法 以及parameter配置 封装到map
            appendParameters(map, provider, Constants.DEFAULT_KEY);
            //反射获取provider所有公共的get方法 以及parameter配置 封装到map
            appendParameters(map, protocolConfig);
            //反射获取当前Service所有公共的get方法 以及parameter配置 封装到map
            appendParameters(map, this);
            //封装methodConfig配置到map
            if (methods != null && !methods.isEmpty()) {
                ....//胜率部分代码
            }
    
            //是否为泛型化发布
            if (ProtocolUtils.isGeneric(generic)) {
                //增加参数generic=true
                map.put(Constants.GENERIC_KEY, generic);
                //增加methods=*
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                //这种版本号到map
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("NO method found in service interface " + interfaceClass.getName());
                    map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
                } else {
                    //主要是将暴露方法封装到map
                    map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            //是否有token配置 将token配置到map
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
                } else {
                    map.put(Constants.TOKEN_KEY, token);
                }
            }
            //获取协议是否是本地调用injvm
            if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
                //配置不注册
                protocolConfig.setRegister(false);
                //map增加参数notify=false
                map.put("notify", "false");
            }
            // 如果protocolConfig 没有配置contextPath则从provider获取
            String contextPath = protocolConfig.getContextpath();
            if ((contextPath == null || contextPath.length() == 0) && provider != null) {
                contextPath = provider.getContextpath();
            }
            /**
             * 获取暴露服务的host
             */
            String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
            //暴露服务的port
            Integer port = this.findConfigedPorts(protocolConfig, name, map);
            //组织暴露的url
            URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
    
    
            //SPI扩展点根据协议判断是否有指定协议Protocol的ConfiguratorFactory 扩展 可以覆盖我们的Url
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    
    
            /**
             * 构建invoker实例
             * 获取dubbo:service配置的scope属性
             * 其可选值为 none (不暴露)、local (本地)、remote (远程),如果配置为 none,则不暴露。默认为 local。
             */
            String scope = url.getParameter(Constants.SCOPE_KEY);
            // don't export when none is configured SCOPE_NONE!=none如果未配置none
            if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                //remote!=scope  什么是本地暴露: https://zhuanlan.zhihu.com/p/98423741
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    //<5>本地暴露
                    exportLocal(url);
                }
                // export to remote if the config is not local (export to local only when config is local)
                //非本地暴露则远程暴露
                if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && !registryURLs.isEmpty()) {
                        for (URL registryURL : registryURLs) {
                            //是否配置了动态注册dynamic 作用看文档注释 http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-service.html
                            url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                            //获取监控地址url monitor配置
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                //将监控地址追加到url
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
    
                            // For providers, this is used to enable custom proxy to generate invoker
                            //是否有配置proxy 生成动态代理的方式 如果有追加到registryURL参数 见文档http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-service.html
                            String proxy = url.getParameter(Constants.PROXY_KEY);
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                            }
                            /**
                             *     private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
                             *     spi扩展点 生成Invoker
                             *     invoker封装了代理类也就是我们的ServiceImpl类 以及url信息
                             *     registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())
                             *     在registryUrl 追加export=暴露的url
                             *     invoker的url是regitry 参数加了export八路url
                             */
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            //增强 代理了invoker 封装了serviceConfig信息
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            //注意 invoker的url是registryUrl 所以protocol=registry SPI调用的是
                            //registry=com.alibaba.dubbo.registry.integration.RegistryProtocol  直通车:https://www.cnblogs.com/LQBlog/p/12470681.html
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            //追加到暴露列表
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
            }
            this.urls.add(url);
        }

    <5>ServiceConfig#exportLocal

      private void exportLocal(URL url) {
            //如果协议为不是injvm
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                //生成本地Url
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)//injvm
                        .setHost(LOCALHOST) //127.0.0.1
                        .setPort(0);
                //service.classimpl 将本地暴露接口存入StaticContext map key为接口 value为实现类的名字
                StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
                /**
                 * 使用 ProxyFactory 创建 Invoker 对象 根据url配置的proxy来作为spi的Key 缺省值是javasist 动态生成一个代理class 也可以使用jdk
                 * private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
                 * 使用 Protocol 暴露 Invoker 对象 根据url的protcol 来作为SPIKey查找 缺省值duboo
                 *
                 * private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                 * 直通车:https://www.cnblogs.com/LQBlog/p/12470179.html
                 *<6>proxyFactory.getInvoker
                 *<7>protocol.export
                 */
                Exporter<?> exporter = protocol.export(
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
                // 添加到 `exporters`
                exporters.add(exporter);
                logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
            }
        }

    <6>JDKProxyFactory#getInvoker

    参见《dubbo源码阅读-ProxyFactory(十一)之JdkProxyFactory》

    /**
         * proxy为我们的代理对象 type为我们的接口 url为我们的发布和订阅协议
         * 包装成Invoker对象
         * @param proxy
         * @param type
         * @param url
         * @param <T>
         * @return
         */
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                    return method.invoke(proxy, arguments);
                }
            };
        }

    <7>InjvmProtocol#

    参见《dubbo源码阅读-服务暴露(七)之本地暴露(Injvm)》

      public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            //创建一个InjvmExporter代理对象 内部保存了invoker以及所有的本地发布实例 key为 service com.alibaba.dubbo.demo.DemoService
            //<6>exporterMap为InJvmProtocol成员变量 构造函数内部会add key为Service全名称
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
        }
    InjvmExporter构造函数
     InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
            super(invoker);
            //key为service全名称
            this.key = key;
            //为协议成员变量的map
            this.exporterMap = exporterMap;
            //将当前export添加到map
            exporterMap.put(key, this);
        }
  • 相关阅读:
    说说爬虫分享
    飞机大战改进篇
    Unity WebGL WebSocket
    2048 控制台版(C#)
    C++多小球非对心弹性碰撞(HGE引擎)
    Unity中如何计算带minimap的贴图资源的大小
    【原】Unity 骨骼节点对象优化,AnimatorUtility.OptimizeTransformHierarchy
    ios sdk agree 无法通过,升级Mac系统到10.14,并且升级Xcode到最近版本后遇到
    【原】高光贴图参数放入颜色贴图的alpha通道中
    KBEngine游戏服务器(二)——运行Unity的Demo
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12510478.html
Copyright © 2011-2022 走看看