zoukankan      html  css  js  c++  java
  • dubbo客户端源码分析(一)

    rpc框架有很多,公司自研、开源的thrift、dubbo、grpc等。我用过几个框架,了解了一下实现原理,客户端基本都是用代理实现,jdk动态代理、cglib等。最近一段时间想了解一下dubbo源码,看下工作原理。今天看了一下客户端初始化源码

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
        <dubbo:application name="demo-consumer"/>
        <!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
        <dubbo:registry address="zookeeper://127.0.0.1:2181" />
        <dubbo:reference id="demoService" interface="com.gxf.dubbo.demo.DemoSerivce"/>
    </beans>

    这个是dubbo客户端配置,注册中心是本地zk。其中,dubbo是阿里基于spring扩展的schema

    https://gist.github.com/dchjmichael/07dfd189c4c29bab63ec

    这个文档关于spring schema扩展用法写的很不错,要定义xsd, handler, 解析xml的parser

    http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

    这是dubbo的spring-handler.xml,可以找到spring handler

     1 public class DubboNamespaceHandler extends NamespaceHandlerSupport {
     2 
     3     static {
     4         Version.checkDuplicate(DubboNamespaceHandler.class);
     5     }
     6 
     7     public void init() {
     8         registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
     9         registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    10         registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    11         registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    12         registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    13         registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    14         registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    15         registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    16         registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    17         registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    18     }
    19 
    20 }

    这里我们关注客户端,看reference相关的就可以了,看下DubboBeanDefinitionParser

    类的内容有点多,主要工作是注入了一个ReferenceBean的bean

    我们可以看下这个类

    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

    这个类实现了FactoryBean, InitializingBean这个就有点生成动态代理的套路了

    我们看getObject()方法

    public Object getObject() throws Exception {
            return get();
        }

    进入get()方法

    public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("Already destroyed!");
            }
            if (ref == null) {
                init();
            }
            return ref;
        }

    跟进init()方法

    ref = createProxy(map);

    init()方法内容有点多,主要看下这个段, ref也是getBean()返回的对象,这里看方法名可以推测是用的代理

    private T createProxy(Map<String, String> map) {
            URL tmpUrl = new URL("temp", "localhost", 0, map);
            final boolean isJvmRefer;
            if (isInjvm() == null) {
                if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
                    isJvmRefer = false;
                } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                    //默认情况下如果本地有服务暴露,则引用本地服务.
                    isJvmRefer = true;
                } else {
                    isJvmRefer = false;
                }
            } else {
                isJvmRefer = isInjvm().booleanValue();
            }
    
            if (isJvmRefer) {
                URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                invoker = refprotocol.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
                    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (url.getPath() == null || url.getPath().length() == 0) {
                                url = url.setPath(interfaceName);
                            }
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // 通过注册中心配置拼装URL
                    List<URL> us = loadRegistries(false);
                    if (us != null && us.size() > 0) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls == null || urls.size() == 0) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
    
                if (urls.size() == 1) {
                    invoker = refprotocol.refer(interfaceClass, urls.get(0));
                } else {
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        invokers.add(refprotocol.refer(interfaceClass, url));
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            registryURL = url; // 用了最后一个registry url
                        }
                    }
                    if (registryURL != null) { // 有 注册中心协议的URL
                        // 对有注册中心的Cluster 只用 AvailableCluster
                        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                        invoker = cluster.join(new StaticDirectory(u, invokers));
                    } else { // 不是 注册中心的URL
                        invoker = cluster.join(new StaticDirectory(invokers));
                    }
                }
            }
    
            Boolean c = check;
            if (c == null && consumer != null) {
                c = consumer.isCheck();
            }
            if (c == null) {
                c = true; // default true
            }
            if (c && !invoker.isAvailable()) {
                throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
            // 创建服务代理
            return (T) proxyFactory.getProxy(invoker);
        }

    这个方法内容也挺多,今天没看完。应该是代理模式,

    这里有个有意思的地方是,如果服务端没有启动在zk中注册,这里生成客户端代理的时候会抛异常。这个是后面需要去分析源码的,还有server端如何暴露服务的,即监听对应的端口,接收客户端的请求。以及在zk里面保存的内容

    今天还发现一个有意思的地方,dubbo里面有泛化调用。看了泛化调用部分代码,感觉公司用的泛化调用,应该是参考了dubbo的

  • 相关阅读:
    Python | PyQt5编写计时器与倒计时应用程序
    AI文件与PS文件相互导入,并分层可编辑
    maple解方程组
    有限元数值分析
    常用Latex编辑数学公式
    notion
    总结一下ANSYS中不同单元之间选择与连接问题
    参考文献的引用方法
    Abaqus CAE笔记本
    几种大文件传输的平台
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9966915.html
Copyright © 2011-2022 走看看