zoukankan      html  css  js  c++  java
  • Dubbo发布过程中,消费者调用过程

    初始调用

    我们从controller开始看起,当我们发起一个request请求的时候,controller调用的是service,此处我们调用的dubbo引用服务的代理类
    IUserService是一个接口,实际调用的是一个代理类,我们接着往下看:

    @RestController
    @RequestMapping("/view/user")
    public class UserController {
    
    
        @Resource
        private IUserService userService;
    
        @RequestMapping("/list")
        public String getUserList(){
            String list = userService.queryList();
            return list;
        }
    
    }
    

    实际上调用的是InvokerInvocationHandler的invoke方法,内部调用的成员变量invoker的invoke方法,并使用请求方法、参数封装了一个远程调用对象RpcInvocation,接下来我们看一下RpcInvocation对象都有什么?构建完RpcInvocation对象,继续执行invoke方法,如下

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            //invoker = MockClusterInvoker
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }
    

    invoker.invoke

    经过上一章节Dubbo发布过程中,消费者的初始化过程,我们知道此处的invoker是一个被装饰器包装了的invoker

    public class MockClusterInvoker<T> implements Invoker<T> {
        public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
            this.directory = directory;
            this.invoker = invoker;
        }
    
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
            //  directory.getUrl() = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=user-consumer&check=false&dubbo=2.6.2&generic=false&getUserById.retries=3
    //&getUserById.timeout=3000&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=9480
    //&register.ip=192.168.31.199&remote.timestamp=1638364136372&retries=2&revision=1.0.0&side=consumer&timeout=8000&timestamp=1638364165215&version=1.0.0
            //value = false,不是模拟请求
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock 此时的invoker去掉外层装饰,invoker = failoverClusterInovker
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                        }
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }
    }
    
    

    FailoverClusterInovker

    this.invoker.invoke(invocation)首先调用的是抽象父类AbstractClusterInvoker的invoke方法,调用list(invocation)从注册表中获取调用者列表,此处返回的是方法对应的invokers,此处有一个重要方法getMethodParameter,用来获取方法的不同配置,此处是根据方法名称获取方法调用的负载方式,如果没有配置,则走默认方式:random;拿到invoker及loadBalance之后,继续执行doInvoke方法

    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
            LoadBalance loadbalance = null;
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && !invokers.isEmpty()) {
                //获取负载均衡的方式
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }
    
        protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            //directory = RegistryDirectory
            List<Invoker<T>> invokers = directory.list(invocation);
            return invokers;
        }
    }
    
    • 调用AbstractDirectory的list方法
      此处从容器拿到invokers后,有一个路由的处理
    public abstract class AbstractDirectory<T> implements Directory<T> {
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            List<Invoker<T>> invokers = doList(invocation);
            List<Router> localRouters = this.routers; // local reference
            if (localRouters != null && !localRouters.isEmpty()) {
                for (Router router : localRouters) {
                    try {
                        if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                            invokers = router.route(invokers, getConsumerUrl(), invocation);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                    }
                }
            }
            return invokers;
        }
    }
    
    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        private volatile Map<String, List<Invoker<T>>> methodInvokerMap; //方法对应的invoker
    
        public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                    "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
            }
            List<Invoker<T>> invokers = null;
            Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
            if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
                String methodName = RpcUtils.getMethodName(invocation);
                Object[] args = RpcUtils.getArguments(invocation);
                if (args != null && args.length > 0 && args[0] != null
                        && (args[0] instanceof String || args[0].getClass().isEnum())) {
                    invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
                }
                if (invokers == null) {
                    //从容器中返回方法对应的invoker
                    invokers = localMethodInvokerMap.get(methodName);
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
                }
                if (invokers == null) {
                    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                    if (iterator.hasNext()) {
                        invokers = iterator.next();
                    }
                }
            }
            return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
        }
    
    }
    

    doInvoke

    在方法中,根据加载的负载均衡策略选择invoker,继续调用RegistryDirectory invoker的invoke方法 ->invoker(ProtocolFilterWrapper).invoke->经过多个Filter的调用(ConsumerContextFilter、ProtocolFilterWrapper、FutureFilter、PFW、MonitorFilter)-LitenerInvokerWrapper DubboInvoker.invoke->abstractInvoker.invoke()->abstractInvoker.doInvoke() ->dubboInvoker.doInvoke ->client.request
    最后通过clinet发起dubbo协议的netty请求,请求服务端。服务端接收请求,进行处理。

    2.远程对象RpcInvocation

    public class RpcInvocation implements Invocation, Serializable {
        public RpcInvocation(Method method, Object[] arguments) {
            this(method.getName(), method.getParameterTypes(), arguments, null, null);
        }
        public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
            this.methodName = methodName;
            this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
            this.arguments = arguments == null ? new Object[0] : arguments;
            //此处的attachments invoker为空
            this.attachments = attachments == null ? new HashMap<String, String>() : attachments;
            this.invoker = invoker;
        }
    }
    
  • 相关阅读:
    18网三袁昳 实验三
    18网三袁昳 网络对抗技术实验二
    18网三袁昳 网络对抗技术实验一
    实验六
    实验五(18网三43号)
    实验四 18网三43号
    实验三(18网三43号)
    实验二(18网三43号袁昳)
    实验一(18网三43号)
    实验四(恶意代码技术)
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15630842.html
Copyright © 2011-2022 走看看