zoukankan      html  css  js  c++  java
  • Dubbo源码解析(七)之consumer初始化

    dubbo的consumer由ReferenceBean初始化,我们先来看一下这个类的层次结构:

     我们看到ReferenceBean实现了InitializingBean,所以我们先来看一下它的afterPropertiesSet方法实现:
    ReferenceBean:

    public void afterPropertiesSet() throws Exception {
    // ConsumerConfig配置
    if (getConsumer() == null) {
    Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
    if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
    ConsumerConfig consumerConfig = null;
    for (ConsumerConfig config : consumerConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (consumerConfig != null) {
    throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
    }
    consumerConfig = config;
    }
    }
    if (consumerConfig != null) {
    setConsumer(consumerConfig);
    }
    }
    }
    // ApplicationConfig配置
    if (getApplication() == null
    && (getConsumer() == null || getConsumer().getApplication() == null)) {
    Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
    if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
    ApplicationConfig applicationConfig = null;
    for (ApplicationConfig config : applicationConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (applicationConfig != null) {
    throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
    }
    applicationConfig = config;
    }
    }
    if (applicationConfig != null) {
    setApplication(applicationConfig);
    }
    }
    }
    // ModuleConfig配置
    if (getModule() == null
    && (getConsumer() == null || getConsumer().getModule() == null)) {
    Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
    if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
    ModuleConfig moduleConfig = null;
    for (ModuleConfig config : moduleConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (moduleConfig != null) {
    throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
    }
    moduleConfig = config;
    }
    }
    if (moduleConfig != null) {
    setModule(moduleConfig);
    }
    }
    }
    // RegistryConfig配置
    if ((getRegistries() == null || getRegistries().size() == 0)
    && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)
    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
    Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
    if (registryConfigMap != null && registryConfigMap.size() > 0) {
    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
    for (RegistryConfig config : registryConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    registryConfigs.add(config);
    }
    }
    if (registryConfigs != null && registryConfigs.size() > 0) {
    super.setRegistries(registryConfigs);
    }
    }
    }
    // MonitorConfig配置
    if (getMonitor() == null
    && (getConsumer() == null || getConsumer().getMonitor() == null)
    && (getApplication() == null || getApplication().getMonitor() == null)) {
    Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
    if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
    MonitorConfig monitorConfig = null;
    for (MonitorConfig config : monitorConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (monitorConfig != null) {
    throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
    }
    monitorConfig = config;
    }
    }
    if (monitorConfig != null) {
    setMonitor(monitorConfig);
    }
    }
    }
    Boolean b = isInit();
    if (b == null && getConsumer() != null) {
    b = getConsumer().isInit();
    }
    if (b != null && b.booleanValue()) {
    /* 非延迟初始化直接进行初始化 */
    getObject();
    }
    }
    ReferenceBean:
    public Object getObject() throws Exception { return get(); }
    ReferenceConfig:
    public synchronized T get() {
    if (destroyed) {
    throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
    init(); /* 初始化 */
    }
    return ref;
    }
    ReferenceConfig:
    private void init() {
    if (initialized) {
    return;
    }
    initialized = true;
    if (interfaceName == null || interfaceName.length() == 0) {
    throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");
    }
    // 获取consumer全局配置
    checkDefault();
    // 追加properties配置,我们在provider初始化的文章中分析过,复用的是相同的方法
    appendProperties(this);
    if (getGeneric() == null && getConsumer() != null) {
    setGeneric(getConsumer().getGeneric());
    }
    if (ProtocolUtils.isGeneric(getGeneric())) {
    // 泛化服务
    interfaceClass = GenericService.class;
    } else {
    try {
    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
    .getContextClassLoader());
    } catch (ClassNotFoundException e) {
    throw new IllegalStateException(e.getMessage(), e);
    }
    // 检查接口和方法,方法在接口中是否存在
    checkInterfaceAndMethods(interfaceClass, methods);
    }
    // 解析文件的应用
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
    resolveFile = System.getProperty("dubbo.resolve.file");
    if (resolveFile == null || resolveFile.length() == 0) {
    File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
    if (userResolveFile.exists()) {
    resolveFile = userResolveFile.getAbsolutePath();
    }
    }
    if (resolveFile != null && resolveFile.length() > 0) {
    Properties properties = new Properties();
    FileInputStream fis = null;
    try {
    fis = new FileInputStream(new File(resolveFile));
    properties.load(fis);
    } catch (IOException e) {
    throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
    } finally {
    try {
    if (null != fis) fis.close();
    } catch (IOException e) {
    logger.warn(e.getMessage(), e);
    }
    }
    resolve = properties.getProperty(interfaceName);
    }
    }
    if (resolve != null && resolve.length() > 0) {
    url = resolve;
    if (logger.isWarnEnabled()) {
    if (resolveFile != null && resolveFile.length() > 0) {
    logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
    } else {
    logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
    }
    }
    }
    // 最终确定各个配置对象
    if (consumer != null) {
    if (application == null) {
    application = consumer.getApplication();
    }
    if (module == null) {
    module = consumer.getModule();
    }
    if (registries == null) {
    registries = consumer.getRegistries();
    }
    if (monitor == null) {
    monitor = consumer.getMonitor();
    }
    }
    if (module != null) {
    if (registries == null) {
    registries = module.getRegistries();
    }
    if (monitor == null) {
    monitor = module.getMonitor();
    }
    }
    if (application != null) {
    if (registries == null) {
    registries = application.getRegistries();
    }
    if (monitor == null) {
    monitor = application.getMonitor();
    }
    }
    // 检查application配置,分析provider初始化时分析过
    checkApplication();
    // 检查stub、mock配置,分析provider初始化时分析过
    checkStubAndMock(interfaceClass);
    Map<String, String> map = new HashMap<String, String>();
    // 属性map
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    if (!isGeneric()) {
    // 校对接口版本
    String revision = Version.getVersion(interfaceClass, version);
    if (revision != null && revision.length() > 0) {
    map.put("revision", revision);
    }
    // 方法列表
    String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    if (methods.length == 0) {
    logger.warn("NO method found in service interface " + interfaceClass.getName());
    map.put("methods", Constants.ANY_VALUE);
    } else {
    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 为一些配置对象追加参数,分析provider初始化时分析过
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    String prifix = StringUtils.getServiceKey(map);
    if (methods != null && methods.size() > 0) {
    for (MethodConfig method : methods) {
    appendParameters(map, method, method.getName());
    String retryKey = method.getName() + ".retry";
    if (map.containsKey(retryKey)) {
    String retryValue = map.remove(retryKey);
    if ("false".equals(retryValue)) {
    // retry配置为false默认重试0次
    map.put(method.getName() + ".retries", "0");
    }
    }
    appendAttributes(attributes, method, prifix + "." + method.getName());
    // 检查配置冲突,将onreturn、onthrow、oninvoke方法名转换为方法
    checkAndConvertImplicitConfig(method, map, attributes);
    }
    }
    // 确定用于注册的ip
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
    hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
    throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    // 属性保存到系统上下文中
    StaticContext.getSystemContext().putAll(attributes);
    /* 创建代理 */
    ref = createProxy(map);
    // 获取唯一服务名称(由group分组、接口名称和版本拼接而成),创建consumer模型
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    // 添加唯一服务名称和consumer模型的映射
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }}
    这里有一些方法我们在分析provider初始化源码的时候分析过,这里就不再重复了,复用的相同的流程。
    ReferenceConfig:
    private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
    // 如果指定了URL,不要做本地引用
    if (url != null && url.length() > 0) {
    isJvmRefer = false;
    } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
    // 默认情况下,引用本地服务,如果存在
    isJvmRefer = true;
    } else {
    isJvmRefer = false;
    }
    } else {
    isJvmRefer = isInjvm().booleanValue();
    }
    if (isJvmRefer) {
    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    // InjvmProtocol做本地引用
    invoker = refprotocol.refer(interfaceClass, url);
    if (logger.isInfoEnabled()) {
    logger.info("Using injvm service " + interfaceClass.getName());
    }
    } else {
    // 用户指定的URL,可以是点对点地址,也可以是注册中心的地址
    if (url != null && url.length() > 0) {
    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
    if (us != null && us.length > 0) {
    for (String u : us) {
    URL url = URL.valueOf(u);
    if (url.getPath() == null || url.getPath().length() == 0) {
    url = url.setPath(interfaceName);
    }
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    } else {
    urls.add(ClusterUtils.mergeUrl(url, map));
    }
    }
    }
    // 从注册中心的配置组合URL
    } else {
    // 加载注册中心配置,provider初始化文章中分析过
    List<URL> us = loadRegistries(false);
    if (us != null && us.size() > 0) {
    for (URL u : us) {
    // 加载监控中心地址
    URL monitorUrl = loadMonitor(u);
    if (monitorUrl != null) {
    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    }
    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    }
    }
    if (urls == null || urls.size() == 0) {
    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
    }
    }
    if (urls.size() == 1) {
    /* 单个url直接关联引用 */
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
    } else {
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
    /* 多个url循环关联引用返回invoker */
    invokers.add(refprotocol.refer(interfaceClass, url));
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    registryURL = url; // 使用最后一个注册中心url
    }
    }
    if (registryURL != null) {
    // 仅在register的群集可用时才使用AvailableCluster
    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
    // AvailableCluster
    invoker = cluster.join(new StaticDirectory(u, invokers));
    } else {
    // 不是一个注册中心url
    invoker = cluster.join(new StaticDirectory(invokers));
    }
    }
    }
    Boolean c = check;
    if (c == null && consumer != null) {
    c = consumer.isCheck();
    }
    if (c == null) {
    c = true; // 默认要检查关联服务的provider是否存在
    }
    if (c && !invoker.isAvailable()) {
    throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
    logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    // javaassist生成动态代理类
    return (T) proxyFactory.getProxy(invoker);
    }}
    RegistryProtocol:
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 获取注册中心对象
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
    return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
    // 多个分组或者是通配符分组的处理
    if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
    || "*".equals(group)) {
    /* 关联服务引用 */
    return doRefer(getMergeableCluster(), registry, type, url);
    }
    }
    /* 关联服务引用 */
    return doRefer(cluster, registry, type, url);
    }
    关于注册中心的相关操作,我们在Dubbo源码解析之registry注册中心这篇文章中已经分析过,这里不再赘述。
    RegistryProtocol:
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
    && url.getParameter(Constants.REGISTER_KEY, true)) {
    // 注册中心注册成为消费者
    registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
    Constants.CHECK_KEY, String.valueOf(false)));
    }
    // 订阅注册中心中服务的提供者、配置、和路由等相关信息
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
    Constants.PROVIDERS_CATEGORY
    + "," + Constants.CONFIGURATORS_CATEGORY
    + "," + Constants.ROUTERS_CATEGORY));
    // 多分组为MergeableCluster,单分组默认为FailoverCluster
    // 分别返回的Invoker就是MergeableClusterInvoker和FailoverClusterInvoker
    Invoker invoker = cluster.join(directory);
    /* 注册consumer */
    ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
    return invoker;
    }
    ProviderConsumerRegTable:
    public static void registerConsuemr(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
    // 构建包装类
    ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
    String serviceUniqueName = consumerUrl.getServiceKey();
    // 从服务唯一名称和invoker集合的映射中获取invoker集合
    Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
    if (invokers == null) {
    // 获取不到添加新的集合
    consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
    invokers = consumerInvokers.get(serviceUniqueName);
    }
    // 包装类放入集合
    invokers.add(wrapperInvoker);
    }
    到这里,整个consumer初始化流程就分析完成了。
    
    
  • 相关阅读:
    水木→函数式编程语言→lisp是不是主要用来编网站的?
    OpenMP 维基百科,自由的百科全书
    一个实际的Lisp项目开发心得 albert_lee的产品技术空间 博客频道 CSDN.NET
    ...
    OpenMPI
    Debian下安装NetBeans
    Linux Socket学习(十七)
    Linux Socket学习(十四)
    Debian下安装Latex
    Debian下安装virtualbox
  • 原文地址:https://www.cnblogs.com/lanblogs/p/15261906.html
Copyright © 2011-2022 走看看