zoukankan      html  css  js  c++  java
  • Dubbo服务提供者Provider启动流程下(四)

    服务暴露实际上就是启动server本地监听,并且将服务信息注册到注册中心上。在dubbo:service上的export可以指定是否暴露,同时provider也可以指定延迟暴露的时间。

    if (!shouldExport()) { // 判断是否暴露服务,由dubbo:service export="true|false"来指定。
                return;
            }
            // 服务暴露  
            if (shouldDelay()) {
                delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
            } else {
                doExport();
            }

    doExport

    ServiceConfig#doExport -> ServiceConfig#doExportUrls

        private void doExportUrls() {
            // 首先遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),
            // 然后将地址封装成URL对象,关于注册中心的所有配置属性,最终转换成url的属性(?属性名=属性值),
            // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
            // application=dubbo-demo-api-provider&dubbo=2.0.2&pid=32033&registry=zookeeper&timestamp=1639838932336
            List<URL> registryURLs = loadRegistries(true);
            /**
             * 每一个服务协议都会往多个注册中心暴露
             */
            for (ProtocolConfig protocolConfig : protocols) {
                // 在 URL.buildKey 中,将从 protocolConfig中获取的path,group,version进行拼装,最终形式类似于 ${group}/${path}:${version}
                String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
                ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
                // 将服务提供者信息注册到ApplicationModel实例中。
                ApplicationModel.initProviderModel(pathKey, providerModel);
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }

    serviceConfig#doExportUrlsFor1Protocol

    这个方法主要就是将服务的配置信息包装成一个url,并且本地启动服务,并将服务信息发布到注册中心。代码贴出来会比较多,我这边按重点贴出部分

    代码前端包括了用Map存储该协议的所有配置参数,包括协议名称、dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供Dubbo:service的属性。

    将service 里面的 method 里面的 argument 也加到 map中,其中就包括了callback。

    然后泛化信息添加。另外将所有的方法名也添加进map,以逗号分隔,这边使用javassiat字节码生成器获取所有的方法具体详解代码   String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();

    之后有token的处理,还有获取服务的host和port

            // 获取配置的ip,并连接绑定端口
            /**
             * 解析服务提供者的IP地址与端口。
             * 服务IP地址解析顺序:(序号越小越优先)
             *
             * 系统环境变量,变量名:DUBBO_DUBBO_IP_TO_BIND
             * 系统属性,变量名:DUBBO_DUBBO_IP_TO_BIND
             * 系统环境变量,变量名:DUBBO_IP_TO_BIND
             * 系统属性,变量名:DUBBO_IP_TO_BIND
             * dubbo:protocol 标签的host属性 --》 dubbo:provider 标签的host属性
             * 选择第一个可用网卡,其实现方式是建立socket,连接注册中心,获取socket的IP地址
             */
            String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
            /**
             * 系统环境变量,变量名:DUBBO_DUBBO_PORT_TO_BIND
             * 系统属性,变量名:DUBBO_DUBBO_PORT_TO_BIND
             * 系统环境变量,变量名:DUBBO_PORT_TO_BIND
             * 系统属性,变量名DUBBO_PORT_TO_BIND
             * dubbo:protocol标签port属性、dubbo:provider标签的port属性。
             * 随机选择一个端口。
             */
            Integer port = this.findConfigedPorts(protocolConfig, name, map);

    然后再一次封装服务的暴露URL 

    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    大体内容如下:

             /**
             * dubbo://192.168.0.102:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider
             * &bind.ip=192.168.0.102&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true
             * &deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&
             * interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=32191&register=true&release=&side=provider&timestamp=1639840839565
             */

    接着获取 动态配置中心,并将url写到配置中心。接着往下

    根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一。
    如果scope不为remote,则先在本地暴露(injvm):,具体暴露服务的具体实现,将在remote 模式中详细分析。
    如果scope不为local,则将服务暴露在远程。
    remote方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。
    如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
    根据注册中心url(注册中心url),构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加monitor,其值为监控中心url(已编码)。

    下面就是服务暴露的细节了

                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                   // 缓存已经暴露的服务 exporters.add(exporter);

    我们debug到invoke这边,对与invoke  后续会出专门的章节来介绍。这边只要了解到Invoke中的URL

    registryURL = registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=33205&registry=zookeeper&timestamp=1639897064350

    invoke.url =    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.0.102%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.0.102%26bind.port%3D20880%26default.deprecated%3Dfalse%26default.dynamic%3Dfalse%26default.register%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dfalse%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D33205%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1639897069365&pid=33205&registry=zookeeper&timestamp=1639897064350  其中协议就是register

    dubbo的扩展中,对于扩展点接口的方法上有@Adaptive注解的话,会去参数里面获取URL,然后取对应的key,这边的key就是register。那么很好了解下面的方法了Exporter<?> exporter = protocol.export(wrapperInvoker);

    实际上调用的RegistryProtocol的exporter。

        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry  
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
            ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                    registryUrl, registeredProviderUrl);
            //to judge if we need to delay publish
            boolean register = registeredProviderUrl.getParameter("register", true);
            if (register) {
                register(registryUrl, registeredProviderUrl);
                providerInvokerWrapper.setReg(true);
            }
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }

    在上面的URL providerUrl = getProviderUrl(originInvoker);方法中,把invoke中的url获取后,替换了前面的register协议,变成了dubbo协议,即dubbo://开头的URL。

    接下来看下面的执行doLocalExport方法

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
            String key = getCacheKey(originInvoker);
    
            // 根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。
            return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
                Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
                return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
            });
        }

    注意这里面使用了一个invoke的委托类,将url改成了providerUrl,即dubbo协议开头。所以根据dubbo的扩展实现,拿到了protocal实现类就是DubboProtocal。

    DubboProtocal#export

       @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.   key = org.apache.dubbo.demo.DemoService:20880
            String key = serviceKey(url);
            // 构造一个Exporter
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            // 是否将转发事件导出成stub
            Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            /**
             * 而后会调用 openServer ,其主要作用是开启一个 ExchangeServer, 实际就是对 底层网络通信的封装,在dubbo 传输协议中,使用的是Netty 作为网络传输协议,
             * Dubbo 内置也支持 Grizzly、Mina 等传输协议。
             *
             * 而 optimizeSerialization 则是 序列化优化器来优化url,不过目前版本(2.7.2)并没有实现其相关逻辑, SerializationOptimizer 没有子类。
             *
             * 最后返回该 exporter
             */
            openServer(url);
            optimizeSerialization(url);
    
            return exporter;
        }

    DubboProtocal#openServer

     private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    synchronized (this) {
                        server = serverMap.get(key);
                        if (server == null) {
                            serverMap.put(key, createServer(url));
                        }
                    }
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }

    同一个主机上上不同服务,使用同一个address,所以共享一个ExchangeServer,如果内存中找不到,则create

    private ExchangeServer createServer(URL url) {
            url = URLBuilder.from(url)
                    // send readonly event when server closes, it's enabled by default
                    .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())  // channel.readonly.sent=true
                    // enable heartbeat by default
                    .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)) // heartbeat=60000
                    .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)  // codec=dubbo
                    .build();
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // netty
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
            }
    
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
    
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
    
            return server;
        }

    为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。
    为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。
    为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
    根据SPI机制,判断server属性是否支持。
    为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。
    根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
    验证客户端类型是否可用。

    那么服务信息是怎么写到Zookeeper的呢?

    在上文的RegisterProtocal#export -> RegisterProtocal#getRegistry

    final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
            ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                    registryUrl, registeredProviderUrl);
            //to judge if we need to delay publish
            boolean register = registeredProviderUrl.getParameter("register", true);
            if (register) {
                register(registryUrl, registeredProviderUrl);
                providerInvokerWrapper.setReg(true);
            }
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

     根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:ZookeeperRegistry

        private Registry getRegistry(final Invoker<?> originInvoker) {
            URL registryUrl = getRegistryUrl(originInvoker);
            return registryFactory.getRegistry(registryUrl);
        }

    这里面registryFactory的实现类是啥呢?这个就在之前章节讲到了扩展点的IOC注入,实际上registryFactory也是有ExtensionLoader去加载的。

    获取服务提供者URL中的register属性,如果为true,则调用注册中心的ZookeeperRegistry#register方法向注册中心注册服务(实际由其父类FailbackRegistry实现,ZookeeperRegistry extends FailbackRegistry)。

    FailbackRegistry#register -> ZookeeperRegistry#doRegister

    public void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

    最后服务提供者向注册中心订阅自己,主要是为了服务提供者URL发送变化后重新暴露服务,当然,会将dubbo:reference的check属性设置为false。

  • 相关阅读:
    adb shell am force-stop <package>
    推荐一个代码生成工具:freemarker
    子控件跟着父控件变色
    sqlite支持的数据库类型
    android 资源文件
    一个手机基础信息的获取代码
    二维码的开源项目
    在点击HOME键时, 在点击icon回到原来的应用。
    Wireshark "The NPF driver isn’t running…"
    .atomic vs volatile
  • 原文地址:https://www.cnblogs.com/gaojy/p/15705945.html
Copyright © 2011-2022 走看看