zoukankan      html  css  js  c++  java
  • Dubbo-服务提供者初始化

    服务提供者初始化过程,即ServiceBean 初始化过程 
     
    一、解析配置文件
    spring在解析配置文件的过程中,会找到dubbo 命名空间对应的handler,DubboNamespaceHandler
     
    public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    
        static {
            Version.checkDuplicate(DubboNamespaceHandler.class);
        }
    
        public void init() {
            registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
            registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
            registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
            registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
            registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
            registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
            registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
            registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
            registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
            registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
        }
    
    }

     
     
    启动大致流程
    流程如下
    • spring容器启动
    • ServiceBean初始化
    • 事件发布,执行onApplicationEvent,开始执行ServiceBean暴露export操作
    • 启动NettyServer
    • 注册中心服务暴露export
     
    二、ServiceBean执行export
    当Spring容器处理完<dubbo:service>标签后,会在Spring容器中生成一个ServiceBean ,服务的发布也会在ServiceBean中完成。不妨看一下ServiceBean的定义
    public class ServiceBean<T> extends ServiceConfig<T> 
        implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
    
    }
    而在Spring初始化完成Bean的组装,会调用InitializingBean的afterPropertiesSet方法,在Spring容器加载完成,会接收到事件ContextRefreshedEvent,调用ApplicationListener的onApplicationEvent方法。
    public void afterPropertiesSet() throws Exception {
        // 
        if (!isDelay()) {
            // 暴露服务
            export();
        }
    }
    ServiceBean 实现了ApplicationListener接口,实现onApplicationEvent方法,该方法在spring容器启动完成后“自动”执行
    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                // 
                export();
            }
        }
    }  
     
    public synchronized void export() {
        // 延时暴露
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    // 暴露服务
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            // 直接暴露服务
            doExport();
        }
    }
    

      

    protected synchronized void doExport() {
        // 
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls();
    }
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        //如果不是remote,则暴露本地服务
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }    
        // 如果配置不是local则暴露为远程服务
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            // 如果注册中心地址不为null
            if (registryURLs != null && registryURLs.size() > 0) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    // 忽略不相干的代码 ...
                    // 通过代理工厂将ref对象转化成invoker对象
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // 代理invoker对象
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    // 暴露服务
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    // 一个服务可能有多个提供者,保存在一起
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //忽略若干代码
        //打开服务
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
    private void openServer(URL url) {
    
        String key = url.getAddress();
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        //是否server端
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //如果服务不存在,创建服务
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        //忽略若干代码
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        return server;
    }

    生成的Adpative

    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            // 抛出异常,省略
        }
        public int getDefaultPort() {
            // 抛出异常,省略
        }
        // export
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws RpcException {
            if (arg0 == null) 
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null) 
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" 
                                                + url.toString() + ") use keys([protocol])");
            Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
        public Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws RpcException {
            if (arg1 == null) 
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" 
                                                + url.toString() + ") use keys([protocol])");
            Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
     
  • 相关阅读:
    STM32 Cube之旅-尝试新的开发方式
    FOC 电流采样为什么不准?你忽略了这个细节
    STM32 外部中断详解(原理+配置代码)
    STM32 TIM高级定时器死区时间的计算
    【STM32系列汇总】小白博主的STM32实战快速进阶之路(持续更新)
    FOC 算法基础之欧拉公式
    一阶RC高通滤波器详解(仿真+matlab+C语言实现)
    一阶RC低通滤波器详解(仿真+matlab+C语言实现)
    matlab 调用C程序进行simulink仿真
    matlab 提示 Error using mex No supported compiler or SDK was found 错误的解决办法
  • 原文地址:https://www.cnblogs.com/caoxb/p/13140261.html
Copyright © 2011-2022 走看看