zoukankan      html  css  js  c++  java
  • Dubbo服务发布源码分析

    通过上面的分析,我们知道了服务得发布入口在 DubboBootstrap#start:

    public DubboBootstrap start() {
        // 原子操作,避免并发问题
        if (started.compareAndSet(false, true)) {
            ready.set(false);
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 1. export Dubbo Services
            // 发布服务
            exportServices();
    
            // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
                // 2. export MetadataService
                // 发布元数据服务
                exportMetadataService();
                //3. Register the local ServiceInstance if required
                // 注册服务实例
                registerServiceInstance();
            }
    
            // 客户端相关的操作
            referServices();
            if (asyncExportingFutures.size() > 0) {
                new Thread(() -> {
                    try {
                        this.awaitFinish();
                    } catch (Exception e) {
                        logger.warn(NAME + " exportAsync occurred an exception.");
                    }
                    ready.set(true);
                    if (logger.isInfoEnabled()) {
                        logger.info(NAME + " is ready.");
                    }
                }).start();
            } else {
                ready.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }
    

    initialize

    private void initialize() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
        // 初始化拓展外部化配置
        ApplicationModel.initFrameworkExts();
        // 如果配置了中心配置,如 dubbo-admin,则进行初始化
        startConfigCenter();
        // 如果有必要,注册到中心配置
        useRegistryAsConfigCenterIfNecessary();
        // 加载远程配置
        loadRemoteConfigs();
        // 检查全局配置
        checkGlobalConfigs();
        // 初始化元数据服务
        initMetadataService();
        // 初始化事件监听器
        initEventListener();
    
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has been initialized!");
        }
    }
    

    DubboBootstrap#exportServices

    private void exportServices() {
        // 遍历我们需要发布的服务实现类,进行发布
        configManager.getServices().forEach(sc -> {
            // TODO, compatible with ServiceConfig.export()
            // 这里就是之前将我们需要发布的 DubboService 包装成 ServiceBean
            // 而ServiceBean 是 ServiceConfig 的子类
            ServiceConfig serviceConfig = (ServiceConfig) sc;
            serviceConfig.setBootstrap(this);
            // 异步发布
            if (exportAsync) {//调用线程池+Futrue 发布
                ExecutorService executor = executorRepository.getServiceExporterExecutor();
                Future<?> future = executor.submit(() -> {
                    sc.export();
                    exportedServices.add(sc);
                });
                asyncExportingFutures.add(future);
            } else {// 同步发布
                sc.export();
                exportedServices.add(sc);// 发布完添加到发布服务的集合中
            }
        });
    }
    

    无论同步/异步 发布,均会走到 ServiceConfig#export 方法中

    public synchronized void export() {
        // 是否需要发布
        if (!shouldExport()) {
            return;
        }
    
        // 检查 bootstrap是否初始化
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
    
        // 检查相关配置
        checkAndUpdateSubConfigs();
    
        // 初始化元数据
        //init serviceMetadata
        serviceMetadata.setVersion(version);
        serviceMetadata.setGroup(group);
        serviceMetadata.setDefaultGroup(group);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());
    
        // 是否延迟发布
        if (shouldDelay()) {// 构建一个定时任务
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 直接发布
            doExport();
        }
    
        exported();
    }
    

    然后进入 ServiceConfig#doExport 这里面没有什么特殊逻辑,转到 ServiceConfig#doExportUrls 主要流程,根据开发者配置的协议列表,遍历协议列表逐项进行发布。

    private void doExportUrls() {
        // 获取服务仓库,其实就是一个缓存
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        // 添加
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        // 缓存 provider
        repository.registerProvider(
            getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            serviceMetadata
        );
    
        // 获取配置的注册中心列表
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
    
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                                          .map(p -> p + "/" + path)
                                          .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            // TODO, uncomment this line once service key is unified
            serviceMetadata.setServiceKey(pathKey);
            // 通过注册中心发布服务
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    

    ServiceConfig#doExportUrlsFor1Protocol

    本质上做了以下几件事:

    • 生成url
    • 根据url中配置的协议类型,调用指定协议进行服务的发布
      • 启动服务
      • 注册服务
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        // 获取协议名称
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;//默认为dubbo
        }
        
        //准备MAP。用域拼接URL
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);
    
        ServiceConfig.appendRuntimeParameters(map);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        if (CollectionUtils.isNotEmpty(getMethods())) {
            for (MethodConfig method : getMethods()) {
                AbstractConfig.appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }
    
                    }
                }
            } // end of methods for
        }
    
        // 以上代码都是为了组装 URL
       // 是否泛化接口
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
    
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
    
        /**
             * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
             */
        // token 校验
        if(ConfigUtils.isEmpty(token) && provider != null) {
            token = provider.getToken();
        }
    
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);
    
        // export service
        // 主机绑定
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        // 组装URL
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    
        // 获取拓展点
        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
    
        String scope = url.getParameter(SCOPE_KEY);
    
        //如果是inJvm发布的协议,那么服务的调用,则是本地调用。判断是否需要发布到远程,或者是否需要发布服务, 如果none,就不需要发布循环遍历配置的注册中心的列表,
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    
            // export to local if the config is not remote (export to remote only when config is remote)
            // 如果scope!=remote, 则先暴露本地服务
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        //如果设置的protocol是injvm,跳过
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }
    
                        // For providers, this is used to enable custom proxy to generate invoker
                        // 是否采用自定义的动态代理机制,默认是javassist
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
    
                        //获得一个自适应扩展点,这个时候返回的Invoker是一个动态代理类。
                        // Invoker ,调用器. 服务提供者、服务的消费者
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        //Protocol$Adaptive这个在自适应扩展里面有写,实际上调用的是动态生成的适配类中的export() 
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                     * @since 2.7.0
                     * ServiceData Store
                     */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);
    }
    

    其实本质上就是解析 @DubboService 的注解配置元数据,然后来到了 主机绑定,也就是 IP的查找方法上 ServiceConfig#findConfigedHosts:

    private String findConfigedHosts(ProtocolConfig protocolConfig,
                                     List<URL> registryURLs,
                                     Map<String, String> map) {
        boolean anyhost = false;
    
        // 查找环境变量中是否存在启动参数 [DUBBO_IP_TO_BIND] =服务注册的ip
        String hostToBind = getValueFromConfig(protocolConfig, DUBBO_IP_TO_BIND);
        if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) {
            throw new IllegalArgumentException("Specified invalid bind ip from property:" + DUBBO_IP_TO_BIND + ", value:" + hostToBind);
        }
    
        // if bind ip is not found in environment, keep looking up
        if (StringUtils.isEmpty(hostToBind)) {
            // 获得本机ip地址
            hostToBind = protocolConfig.getHost();
            if (provider != null && StringUtils.isEmpty(hostToBind)) {
                hostToBind = provider.getHost();
            }
            if (isInvalidLocalHost(hostToBind)) {
                anyhost = true;
                try {
                    logger.info("No valid ip found from environment, try to find valid host from DNS.");
                    hostToBind = InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException e) {
                    logger.warn(e.getMessage(), e);
                }
                if (isInvalidLocalHost(hostToBind)) {
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
                            if (MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) {
                                // skip multicast registry since we cannot connect to it via Socket
                                continue;
                            }
                            try (Socket socket = new Socket()) {
                                SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                //通过Socket去连接注册中心,从而获取本机IP
                                socket.connect(addr, 1000);
                                hostToBind = socket.getLocalAddress().getHostAddress();
                                break;
                            } catch (Exception e) {
                                logger.warn(e.getMessage(), e);
                            }
                        }
                    }
                    if (isInvalidLocalHost(hostToBind)) {
                        //会轮询本机的网卡,直到找到合适的IP地址
                        hostToBind = getLocalHost();
                    }
                }
            }
        }
    
        map.put(BIND_IP_KEY, hostToBind);
    
        // registry ip is not used for bind ip by default
        //上面获取到的ip地址是bindip,如果需要作为服务注册中心的ip, DUBBO_IP_TO_REGISTRY -dDUBBO_IP_TO_REGISTRY=ip
        String hostToRegistry = getValueFromConfig(protocolConfig, DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        } else if (StringUtils.isEmpty(hostToRegistry)) {
            // bind ip is used as registry ip by default
            hostToRegistry = hostToBind;
        }
    
        map.put(ANYHOST_KEY, String.valueOf(anyhost));
    
        return hostToRegistry;
    }
    

    总之就是直到找到一个合法的主机地址为止。然后获取到端口。将map 配置信息集合、IP、Port 传入,构造一个 URL

    dubbo://192.168.111.1:20880/com.springboot.dubbo.provider.services.IDemoService?anyhost=true&application=spring-boot-dubbo-sample-provider&bind.ip=192.168.111.1&bind.port=20880&
    default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.springboot.dubbo.provider.services.IDemoService&methods=getTxt&pid=7676&
    qos.accept.foreign.ip=false&qos.enable=true&qos.port=8888&release=2.7.7&side=provider&timestamp=1609261509128
    

    ServiceConfig#doExportUrlsFor1Protocol 中的invoker对象

    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    

    其中 PROXY_FACTORY 定义如下:

    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    

    对应的接口拓展点默认实现为 javassist ,但是会有一个 StubProxyFactoryWrapper 进行包装,但是这里不影响,所以进入 JavassistProxyFactory#getInvoker

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    通过 javassist 生成一个代理类,这里持有了对应我们需要发布的服务类的所有信息。然后将该类进行传递,一直到本地服务的发布及服务的注册。而后消费端通过这里的 wrapper.invokeMethod 进行调用。

    我们可以看一下在我这个环境测试的服务下生成的代理方法的代码,需要进入 Wrapper.getWrapper 方法断点获取:

    我们将 c3 拷贝出来:

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{ 
          com.springboot.dubbo.provider.services.IDemoService w; 
          try{ 
              w = ((com.springboot.dubbo.provider.services.IDemoService)$1); 
          }catch(Throwable e){ 
              throw new IllegalArgumentException(e); 
          } 
          try{ 
              if( "getTxt".equals( $2 )  &&  $3.length == 0 ) { 
                  return ($w)w.getTxt();
              } 
          } catch(Throwable e) {   
              throw new java.lang.reflect.InvocationTargetException(e);  
          } 
          throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method ""+$2+"" in class com.springboot.dubbo.provider.services.IDemoService."); 
    }
    

    构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod。

    接下来我们来看看服务的远程发布 :

    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
    exporters.add(exporter);
    

    这个 PROTOCOL 的实例化,跟我们上面分析SPI之自适应拓展点一摸一样,所以这里得到的对象是 ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))。但是需要明白的是,Dubbo 基于URL 驱动,那么这个时候我们需要知道的是URL中携带的协议是什么,这样我们才能够找到对应的拓展点

    我们发现这里已经被替换成了 registry 协议,那么此刻应该走到 Protocol$Adaptive 的动态适配器类中,而其中最为关键的代码如下:

    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    

    然后通过这个 extName ,通过获取指定名称的拓展点,找到对应的实现,那么这里的 registry 对应的就是 org.apache.dubbo.registry.integration.RegistryProtocol,但是Protocol 有包装类,那么最后的对象应该是 ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(RegistryProtocol)))

    这里的三个包装类都会判断URL是不是 registry 协议,如果是直接进入下个调用链,当前场景正是 registry 。最终调用 RegistryProtocol#export

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
         // 通过URL里面的 registry 属性对应的值获取的注册地址,配置了zookeeper 则这里就是 zookeeper://192.168.1.16:2181/..
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        // 发布的服务地址,当前情况下是dubbo协议 则这里就是dubbo://192.168.111.1:20880/...
        URL providerUrl = getProviderUrl(originInvoker);
    
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        // 修改URL ,这里设置成 provider://192.168.111.1:20880/......
        //订阅override数据。在admin控制台可以针对服务进行治理,比如修改权重,修改路由机制等,当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
        // 结合配置相关重写 URL
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        // 启动 Netty 并且发布本地服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
        // url to registry
        // 获取注册实例,这里如果配置了zookeeper ,则返回 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        //获取要注册到注册中心的URL: dubbo://ip:port
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 注册服务
            register(registryUrl, registeredProviderUrl);
        }
    
        // register stated url on provider model
        //设置注册中心的订阅
        registerStatedUrl(registryUrl, registeredProviderUrl, register);
    
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
    
        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        //保证每次export都返回一个新的exporter实例
        return new DestroyableExporter<>(exporter);
    }
    

    RegistryProtocol#doLocalExport 然后走服务的发布

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
    
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }
    

     其中 providerUrl 是dubbo:// 协议开头的地址URL,正如之前所说,Dubbo基于URL驱动,那么此刻 protocol 是 Protocol$Adaptive,所以此刻 protocol.export(invokerDelegate) 会走 DubboProtocol#export ,需要注意的是,这里会进行包装 ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))

    DubboProtocol#export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        //export an stub service for dispatching event
        //是否是本地存根事件
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        //是否配置了参数回调机制
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                                          "], has set stubproxy support event ,but no stub methods founded."));
                }
    
            }
        }
    
        openServer(url);
        optimizeSerialization(url);
    
        return exporter;
    }
    

    openServer: 往下看这个过程,进入到openServer(),从名字来看它是用来开启一个服务。去开启一个服务,并且放入到缓存中(在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例)

    private void openServer(URL url) {
        // find server.
        // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //是否在serverMap中缓存了
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                // 创建服务器实例
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                // 服务器已创建,则根据 url 中的配置重置服务器
                server.reset(url);
            }
        }
    }
    

    createServer创建服务

    private ProtocolServer createServer(URL url) {
        //组装url,在url中添加心跳时间、编解码参数
        url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            // 当服务关闭以后,发送一个只读的事件,默认是开启状态
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            // 启动心跳配置
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    
        //通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
    
        //创建ExchangeServer
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
    
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
    
        return new DubboProtocolServer(server);
    }
    

    Exchangers.bind

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
    //获取 Exchanger,默认为 HeaderExchanger。
    //调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
    public static ExchangeClient connect(String url) throws RemotingException {
        return connect(URL.valueOf(url));
    }
    
    // 拓展点,默认为 header
    public static ExchangeClient connect(URL url) throws RemotingException {
        return connect(url, new ChannelHandlerAdapter(), null);
    }
    

    然后根据拓展点进入 HeaderExchanger#bind

    • new DecodeHandler(new HeaderExchangeHandler(handler))
    • Transporters.bind :发布服务
    • new HeaderExchangeServer:服务端消费的调用链

    目前我们只需要关心transporters.bind方法即可

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    Transporters#bind 发布远程服务

    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
    
    // @SPI("netty") 默认为最新的 netty4 实现
    public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url), handler);
    }
    

    走到是最新的netty4版本的 netty进行服务发布:
    进入到 org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }
    

    然后创建了一个 NettyServer 实例, 里面有个 doOpen 方法用域开启服务。接下去就是启动Netty服务了。
    值得注意的是,这里构造了一个请求处理链,Netty接受到客户端请求的时候会走这个处理链:MultiMessageHandler ->HeartbeatHandle ->AllChannelHandler ->DecodeHandler ->HeaderExchangeHandler->ExchangeHandlerAdapter。

    参考

    org.apache.dubbo 2.7.7 服务发布注册源码

  • 相关阅读:
    关于Java的代理模式
    关于Java串行、并行执行——使用Callable多线程
    关于区域表system_district:省市县街道四级地址表
    关于MongoDB在windows下安装
    关于Eureka 服务注册列表显示IP问题研究
    关于开发APP接口版本不兼容的问题
    关于MySQL创建数据库字符集和数据库排序规则的对比选择
    关于MySQL的行转列
    关于MySQL统计一列中不同值的数量方法
    关于Java 8 forEach
  • 原文地址:https://www.cnblogs.com/snail-gao/p/14204545.html
Copyright © 2011-2022 走看看