zoukankan      html  css  js  c++  java
  • 第四章:(5)原理之 Dubbo 服务引用流程

    一、服务引用流程

      服务引用时,是通过 <dubbo:service> 标签或者 @Reference 注解来做到的,下面就解析一下服务引用流程。

      1、ReferenceBean 实现了 FactoryBean,是一个工厂Bean

    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {}

      2、重写了工厂Bean 的 getObject 方法

        @Override
        public Object getObject() throws Exception {
            return get();
        }
        
        public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("Already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }

      3、init 方法

        private void init() {
            if (initialized) {
                return;
            }
            initialized = true;
            if (interfaceName == null || interfaceName.length() == 0) {
                throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
            }
            // get consumer's global configuration
            checkDefault();
            appendProperties(this);
            ...
    
            //attributes are stored by system context.
            StaticContext.getSystemContext().putAll(attributes);
            //根据配置文件信息(封装到map)获取代理对象
            ref = createProxy(map);
            ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
            ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
        }

      4、createProxy 根据配置信息获取代理对象

        private T createProxy(Map<String, String> map) {
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                    isJvmRefer = false;
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    // by default, reference local service if there is
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                //远程引用interfaceClass这个接口,url 是注册中心地址,
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    List<URL> us = loadRegistries(false);
                    if (us != null && !us.isEmpty()) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls == null || urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // use last registry url
                        }
                    }
                    if (registryURL != null) { // registry url is available
                        // use AvailableCluster only when register's cluster is available
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // not a registry url
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // create service proxy
            return (T) proxyFactory.getProxy(invoker);
        }

      5、refprotocol 还是使用 Java 的 SPI 机制,使用 DubboProtocol

        ① 根据 debug,会先来到 RegistryProtocol 的 refer 方法

        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            Registry registry = registryFactory.getRegistry(url);  //获取注册中心
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            String group = qs.get(Constants.GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                        || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            return doRefer(cluster, registry, type, url);
        }

        ② doRefer 方法

        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            //去注册中心来订阅服务提供者提供的服务
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            Invoker invoker = cluster.join(directory); //返回执行者
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); //注册消费者
            return invoker;
        }

        ③ 然后来到 DubboProtocol 的refer 方法

        @Override
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }

        ④ getClients 获取客户端

        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }    
        
        /**
         * Get shared connection
         */
        private ExchangeClient getSharedClient(URL url) {
            String key = url.getAddress();
            ReferenceCountExchangeClient client = referenceClientMap.get(key);
            if (client != null) {
                if (!client.isClosed()) {
                    client.incrementAndGetCount();
                    return client;
                } else {
                    referenceClientMap.remove(key);
                }
            }
            synchronized (key.intern()) {
                ExchangeClient exchangeClient = initClient(url);
                client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
                referenceClientMap.put(key, client);
                ghostClientMap.remove(key);
                return client;
            }
        }
        
        /**
         * Create new connection
         */
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    client = Exchangers.connect(url, requestHandler); //根据url连接到服务器
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
        
        public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            return getExchanger(url).connect(url, handler);
        }
        
        
        @Override
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
        
        
        public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            ChannelHandler handler;
            if (handlers == null || handlers.length == 0) {
                handler = new ChannelHandlerAdapter();
            } else if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().connect(url, handler);
        }
        
        @Override
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);  //创建一个 Netty 的客户端
        }

      6、registerConsumer:注册消费者

        public static void registerConsumer(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
            ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
            String serviceUniqueName = consumerUrl.getServiceKey();
            Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
            if (invokers == null) {
                consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
                invokers = consumerInvokers.get(serviceUniqueName);
            }
            invokers.add(wrapperInvoker);
        }

        现在ProviderConsumerRegTable 注册表里注册了url和服务提供者提供的服务,也注册了 url 和服务消费者要消费的服务。

    public class ProviderConsumerRegTable {
        public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
        public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
    }

        至此,invoker 代理对象创建完毕。

    二、时序图

        

  • 相关阅读:
    TSP-UK49687
    维度建模的基本原则
    差分约束系统
    随机过程初步
    随机过程——维纳过程
    Xilinx FPGA复位信号设计
    10 Row Abacus
    Python
    FX2LP与FPGA的简单批量回环
    DFT公式的一个简单示例
  • 原文地址:https://www.cnblogs.com/niujifei/p/15808238.html
Copyright © 2011-2022 走看看