zoukankan      html  css  js  c++  java
  • dubbo源码阅读-服务调用之远程调用(十二)

    ReferenceConfig

    createProxy 

    参见《dubbo源码阅读-服务订阅(八)之主流程》

    rivate T createProxy(Map<String, String> map) {
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            //是否是本地调用
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                    //根据url配置判断是否是本地调用
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            //如果是本地引用
            if (isJvmRefer) {
                //组织url为injvm://127.0.0.1
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                /**
                 * SPI扩展点
                 * private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                 */
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                //如果我们手动配置了url;隔开 追加到urls列表
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    // 拆分地址成数组,使用 ";" 分隔。
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    // 循环数组,添加到 `url` 中。
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            // 创建 URL 对象
                            URL url = URL.valueOf(u);
                            // 设置默认路径
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            // 注册中心的地址,带上服务引用的配置参数
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                // 服务提供者的地址
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    //获取注册中心地址 具体可以看你https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-10-0
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            //加载Monitor 使用例子https://blog.csdn.net/sunhuaqiang1/article/details/80141651
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            //添加refer标识 标识是从注册中心地尼公约
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
                 //  // 单 `urls` 时,引用服务,返回 Invoker 对象
                if (urls.size() == 1) {
                    /**
                     * <1>Protocol 取得registryProtocol 不过会被代理 具体可以看 https://www.cnblogs.com/LQBlog/p/12470179.html#autoid-2-0-0
                     * 默认是registry SPI扩展 private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                     */
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    //集群订阅
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    // 循环 `urls` ,引用服务,返回 Invoker 对象
                    for (URL url : urls) {
                        //<1>引用服务
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        // 使用最后一个注册中心的 URL
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
    
                    //  // 有注册中心
                    if (registryURL != null) { // registry url is available
                        // 对有注册中心的 Cluster 只用 AvailableCluster
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                        // 无注册中心,全部都是服务直连
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            //是否配置了启动检查
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                // make it possible for consumer to retry later if provider is temporarily unavailable
                initialized = false;
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // <5>创建代理类
            return (T) proxyFactory.getProxy(invoker);
        }

    RegistryProtocol

    <2>refer

    参见《dubbo源码阅读-服务订阅(八)之远程订阅(dubbo)》

    com.alibaba.dubbo.registry.integration.RegistryProtocol#refer
    复制代码
     private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //type为订阅接口类型 url为订阅url
            //zookeeper://IP:2181/com.alibaba.dubbo.registry.RegistryService?**
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            //获取参数
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    
            //订阅url
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
                //将订阅url 添加到已注册列表 registryList
                registry.register(registeredConsumerUrl);
                //设置到directory
                directory.setRegisteredConsumerUrl(registeredConsumerUrl);
            }
            //<3>订阅服务 内部调用registry.subscribe
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            //<4>通过集群策略包装成一个Invoker返回
            Invoker invoker = cluster.join(directory);
            //注册到订阅列表
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }

    RegistryDirectory

    <3>subscribe

    参见《dubbo源码阅读-注册中心(十三)之Zookeeper》

    public void subscribe(URL url) {
            //此时的url为:consumer://192.168.2.1/com.alibaba.dubbo.demo.DemoService?*
            setConsumerUrl(url);
             //<4>对应的注册中心实现类 调用订阅方法 通知添加监听器 就是当前对象 实现了 NotifyListener
            registry.subscribe(url, this);
        }

    可以发现订阅并回调自己 将订阅url存入到directory本地

    FailbackCluster

    <4>join

    com.alibaba.dubbo.rpc.cluster.support.FailbackCluster#join

    @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            //通过FailbackClusterInvoker进行装饰
            return new FailbackClusterInvoker<T>(directory);
        }

    JdkProxyFactory

    <5>getProxy

    参见《dubbo源码阅读-ProxyFactory(十一)之JdkProxyFactory》

     @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            //<6>jdk代理 通过InvokerInvocationHandler代理
            return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
        }

    可以发现服务订阅最终返回的代理的一个代理对象 执行入口是invokerInvocationHandler

    InvokerInvocationHandler

    <6>invoker

      @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            //如果当前method是Object直接调用
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            //toString hashCode equals 直接调用object的 // 基础方法,不使用 RPC 调用
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            //<7>走RPC 走得FailbackClusterInvoker <4>初始化 将参数封装成RpcInvocation
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }

    FailbackClusterInvoker

    <7>invoke

    com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

     @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            //服务是否销毁 如果销毁抛出异常 当调用destroy方法
            checkWhetherDestroyed();
            LoadBalance loadbalance = null;
    
            // binding attachments into invocation.
            //获取RPCContext的Attachments
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                //添加到PRCInvocation
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
    
            //<8>获取所有服务提供者的invoker集合
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && !invokers.isEmpty()) {
                //spi扩展获取负载均衡策略
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            }
            // 设置调用编号,若是异步调用
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            //<10>模板方法调用子类的doInvoke 这个时候有负载均衡策略和服务提供者列表以及参数信息
            return doInvoke(invocation, invokers, loadbalance);
        }

    <8>list

    com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#list

     protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            //<9>调用director的获取invoker的方法
            List<Invoker<T>> invokers = directory.list(invocation);
            return invokers;
        }

    <10>doInvoke

    com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke

    @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            //url是否含有retries参数 默认重试3次
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                //如果是重试没检查invoker是否为空 并从directory获取最新的invokers
                if (i > 0) {
                    checkWhetherDestroyed();
                    //<9> 重新获取 防止 订阅发生改变
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                //使用负载均衡策略 选取invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                //将已经重试过得invoker添加到RPContext
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    //这里的invoker是dubboInvoker初始化: https://www.cnblogs.com/LQBlog/p/12522417.html#autoid-6-0-0
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + invocation.getMethodName()
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e); //通过RPCException
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                    + invocation.getMethodName() + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
        }

    RegistryDirectory

    <9>list

    com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list

     @Override
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            List<Invoker<T>> invokers = doList(invocation);
            List<Router> localRouters = this.routers; // local reference
            //通过路由规则进行过滤 https://blog.csdn.net/prestigeding/article/details/80848594
            //初始化时机参见https://www.cnblogs.com/LQBlog/p/12522417.html#autoid-5-0-0
            if (localRouters != null && !localRouters.isEmpty()) {
                for (Router router : localRouters) {
                    try {
                        if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                            invokers = router.route(invokers, getConsumerUrl(), invocation);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                    }
                }
            }
            //返回符合条件的invoker
            return invokers;
        }

    dubboInvoker

       @Override
        public Result invoke(Invocation inv) throws RpcException {
            // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
            if (destroyed.get()) {
                logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                        + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
            }
    
            RpcInvocation invocation = (RpcInvocation) inv;
            invocation.setInvoker(this);
            if (attachment != null && attachment.size() > 0) {
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                /**
                 * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
                 * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
                 * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
                 * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
                 */
                invocation.addAttachments(contextAttachments);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
    
            try {
                //11
                return doInvoke(invocation);
            } catch (InvocationTargetException e) { // biz exception
                Throwable te = e.getTargetException();
                if (te == null) {
                    return new RpcResult(e);
                } else {
                    if (te instanceof RpcException) {
                        ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                    }
                    return new RpcResult(te);
                }
            } catch (RpcException e) {
                if (e.isBiz()) {
                    return new RpcResult(e);
                } else {
                    throw e;
                }
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }

    <11>doInvoke

     @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

     

  • 相关阅读:
    spin_lock &amp; mutex_lock的差别?
    Java拾遗(一):浅析Java子类和父类的实例化顺序 及 陷阱
    Android ViewPager使用具体解释
    大数运算
    fragment 中利用spinner实现省市联动
    秒杀多线程第四篇 一个经典的多线程同步问题
    Ewebeditor最新漏洞及漏洞大全
    轻松设置百度搜索手写输入
    Rational Rose 2007 &amp;Rational Rose 2003 下载及破解方法和汉化文件下载
    svm中的数学和算法
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12529840.html
Copyright © 2011-2022 走看看