zoukankan      html  css  js  c++  java
  • 第四章:(6)原理之 Dubbo 服务调用流程

    一、服务调用流程

      1、服务引用完成之后,会返回一个代理对象。

      2、通过 debug,会来到 InvokerInvocationHandler 的 invoker 方法

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }

      3、执行 invoke 方法,来到 MockClusterInvoker 类的 invoke 方法,

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                        }
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }

      4、再执行会到  AbstractClusterInvoker 类的Invoker

        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
            LoadBalance loadbalance = null;
            //去注册中心获取方法的执行者 invoker
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && !invokers.isEmpty()) {
                //获取负载均衡策略
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }

      5、doInvoke 会来到 FailoverClusterInvoker 类的 doInvoke 方法

        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                //根据负载均衡策略,选择一个 invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    // invoker 里面包含了各种的 filter 和 invoker信息,最终会执行 DubboInvoker
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + invocation.getMethodName()
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                    + invocation.getMethodName() + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
        }

      6、DubboInvoker 真正的执行调用

        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            //客户端
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    //客户端发起请求,获取响应
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

      7、request 发送请求

        
        ReferenceCountExchangeClient:
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            return client.request(request, timeout);
        }
        
        HeaderExchangeClient:
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            return channel.request(request, timeout);
        }
        
        HeaderExchangeChannel:
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }

     

      发送请求完毕后,还会设置超时时间,如果超时了,还会进行重试机制,获取响应后,还会进行解码等工作,直到获取响应结果。

      总体来说,就是通过层层的包装,使用 Netty 来进行网络通信的。

    二、服务引用流程图

      

  • 相关阅读:
    traversal outlook folders
    vba get contact name in outlook
    merge all worksheets in current directory
    pandas Dataframe more filter
    Dataframe swap columns
    【数据分析&数据挖掘】数组的数据类型
    【数据分析&数据挖掘】数组的创建
    【数据分析&数据挖掘】矩阵的运算
    【数据分析&数据挖掘】矩阵的创建
    【python基础】装饰器
  • 原文地址:https://www.cnblogs.com/niujifei/p/15808887.html
Copyright © 2011-2022 走看看